diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 0f26c3d6..fd8dfcb1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -588,7 +588,7 @@ public class Scheduler implements Runnable { checkpoint); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, streamName, - leaseRefresher, + leaseCoordinator, executorService, cache, shardRecordProcessorFactory.shardRecordProcessor(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 6aaaff60..578af465 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -78,22 +78,21 @@ public class HierarchicalShardSyncer { final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List shards = getShardList(shardDetector); - checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, scope); + final List latestShards = getShardList(shardDetector); + checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, scope, latestShards); } //Provide a pre-collcted list of shards to avoid calling ListShards API - public synchronized void checkAndCreateLeaseForNewShards(List shards, @NonNull final ShardDetector shardDetector, + public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards)throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - if(CollectionUtils.isNullOrEmpty(shards)) { - shards = getShardList(shardDetector); + if (!CollectionUtils.isNullOrEmpty(latestShards)) { + log.debug("Num shards: {}", latestShards.size()); } - log.debug("Num shards: {}", shards.size()); - final Map shardIdToShardMap = constructShardIdToShardMap(shards); + final Map shardIdToShardMap = constructShardIdToShardMap(latestShards); final Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap( shardIdToShardMap); final Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); @@ -103,8 +102,7 @@ public class HierarchicalShardSyncer { final List currentLeases = leaseRefresher.listLeases(); - final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, - inconsistentShardIds); + final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -116,13 +114,11 @@ public class HierarchicalShardSyncer { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); } } - final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); + cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher); if (cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, - leaseRefresher); + cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index df7ea6aa..bb1788b2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -135,7 +135,7 @@ class ConsumerStates { @Override public ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) { return new BlockOnParentShardTask(consumerArgument.shardInfo(), - consumerArgument.leaseRefresher(), + consumerArgument.leaseCoordinator().leaseRefresher(), consumerArgument.parentShardPollIntervalMillis()); } @@ -492,7 +492,7 @@ class ConsumerStates { argument.initialPositionInStream(), argument.cleanupLeasesOfCompletedShards(), argument.ignoreUnexpectedChildShards(), - argument.leaseRefresher(), + argument.leaseCoordinator(), argument.taskBackoffTimeMillis(), argument.recordsPublisher(), argument.hierarchicalShardSyncer(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index fcc49f63..4f1db733 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -21,6 +21,7 @@ import lombok.experimental.Accessors; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -42,7 +43,7 @@ public class ShardConsumerArgument { @NonNull private final String streamName; @NonNull - private final LeaseRefresher leaseRefresher; + private final LeaseCoordinator leaseCoordinator; @NonNull private final ExecutorService executorService; @NonNull diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index a5cbdf64..2bfcd358 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -20,10 +20,14 @@ import com.sun.org.apache.bcel.internal.generic.LUSHR; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -39,6 +43,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.ArrayList; +import java.util.Collection; import java.util.List; /** @@ -60,13 +65,13 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer; @NonNull - private ShutdownReason reason; + private final ShutdownReason reason; @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; private final boolean ignoreUnexpectedChildShards; @NonNull - private final LeaseRefresher leaseRefresher; + private final LeaseCoordinator leaseCoordinator; private final long backoffTimeMillis; @NonNull private final RecordsPublisher recordsPublisher; @@ -93,29 +98,38 @@ public class ShutdownTask implements ConsumerTask { try { try { - List allShards = new ArrayList<>(); - if(reason == ShutdownReason.SHARD_END) { - allShards = shardDetector.listShards(); + ShutdownReason localReason = reason; + List latestShards = null; + /* + * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END + * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows + * active workers to contend for the lease of this shard. + */ + if (localReason == ShutdownReason.SHARD_END) { + latestShards = shardDetector.listShards(); - if(!isRealShardEnd(allShards)) { - reason = ShutdownReason.LEASE_LOST; + //If latestShards is empty, should also shutdown the ShardConsumer without checkpoint with SHARD_END + if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { + localReason = ShutdownReason.LEASE_LOST; + dropLease(); + log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId()); } } // If we reached end of the shard, set sequence number to SHARD_END. - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); } log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfo.shardId(), shardInfo.concurrencyToken(), reason); - final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(reason) + shardInfo.shardId(), shardInfo.concurrencyToken(), localReason); + final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason) .checkpointer(recordProcessorCheckpointer).build(); final long startTime = System.currentTimeMillis(); try { - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); if (lastCheckpointValue == null @@ -137,11 +151,11 @@ public class ShutdownTask implements ConsumerTask { MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher, - initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), + initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } @@ -183,17 +197,26 @@ public class ShutdownTask implements ConsumerTask { return reason; } - private boolean isRealShardEnd(List shards) { - boolean realShardEnd = false; - + private boolean isShardInContextParentOfAny(List shards) { for(Shard shard : shards) { - if(shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) - || shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())) { - realShardEnd = true; - break; + if (isChildShardOfShardInContext(shard)) { + return true; } } - return realShardEnd; + return false; + } + + private boolean isChildShardOfShardInContext(Shard shard) { + return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId()) + || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId())); + } + + private void dropLease() { + Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId()); + leaseCoordinator.dropLease(currentLease); + if(currentLease != null) { + log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); + } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 5d4ac902..e81e1d52 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -171,6 +171,9 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST); } + /** + * Test checkAndCreateLeaseForNewShards while not providing a pre-fetched list of shards + */ @Test public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception { final List shards = constructShardListForGraphA(); @@ -205,6 +208,74 @@ public class HierarchicalShardSyncerTest { } + /** + * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() + * should never be called. + */ + @Test + public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { + final List latestShards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(latestShards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); + + final Set expectedShardIds = new HashSet<>( + Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + + /** + * Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards() + * should never be called. + */ + @Test + public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception { + final List shards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList()); + + final Set expectedShardIds = new HashSet<>(); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(extendedSequenceNumbers.size(), equalTo(0)); + + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + @Test public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { final List shards = constructShardListForGraphA(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index f9287701..16f5e9a4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -55,6 +56,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; +import javax.swing.*; + @RunWith(MockitoJUnitRunner.class) public class ConsumerStatesTest { private static final String STREAM_NAME = "TestStream"; @@ -73,6 +76,8 @@ public class ConsumerStatesTest { @Mock private ShardInfo shardInfo; @Mock + private LeaseCoordinator leaseCoordinator; + @Mock private LeaseRefresher leaseRefresher; @Mock private Checkpointer checkpointer; @@ -109,7 +114,7 @@ public class ConsumerStatesTest { @Before public void setup() { - argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher, + argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, @@ -127,6 +132,7 @@ public class ConsumerStatesTest { @Test public void blockOnParentStateTest() { ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState(); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); ConsumerTask task = state.createTask(argument, consumer, null); @@ -309,7 +315,7 @@ public class ConsumerStatesTest { assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer", equalTo(recordProcessorCheckpointer))); assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason))); - assertThat(task, shutdownTask(LEASE_REFRESHER_CLASS, "leaseRefresher", equalTo(leaseRefresher))); + assertThat(task, shutdownTask(LeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator))); assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream", equalTo(initialPositionInStream))); assertThat(task, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 8a62bb6c..6af62edb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -42,6 +42,8 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -83,6 +85,8 @@ public class ShutdownTaskTest { @Mock private LeaseRefresher leaseRefresher; @Mock + private LeaseCoordinator leaseCoordinator; + @Mock private ShardDetector shardDetector; @Mock private HierarchicalShardSyncer hierarchicalShardSyncer; @@ -99,12 +103,13 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); } /** * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that customer doesn't implement checkpoint in their implementation */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { @@ -118,19 +123,21 @@ public class ShutdownTaskTest { /** * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that checkAndCreateLeaseForNewShards throws an exception. */ @Test public final void testCallWhenSyncingShardsThrows() throws Exception { - List shards = constructShardListGraphA(); - when(shardDetector.listShards()).thenReturn(shards); + List latestShards = constructShardListGraphA(); + when(shardDetector.listShards()).thenReturn(latestShards); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); doAnswer((invocation) -> { throw new KinesisClientLibIOException("KinesisClientLibIOException"); }).when(hierarchicalShardSyncer) - .checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, + .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - NULL_METRICS_FACTORY.createMetrics()); + NULL_METRICS_FACTORY.createMetrics(), latestShards); final TaskResult result = task.call(); assertNotNull(result.getException()); @@ -142,6 +149,7 @@ public class ShutdownTaskTest { /** * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that ShutdownTask is created for ShardConsumer reaching the Shard End. */ @Test public final void testCallWhenTrueShardEnd() { @@ -149,7 +157,7 @@ public class ShutdownTaskTest { ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); @@ -161,10 +169,12 @@ public class ShutdownTaskTest { verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(shardDetector, times(1)).listShards(); + verify(leaseCoordinator, never()).getAssignments(); } /** * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that a ShutdownTask is created for detecting a false Shard End. */ @Test public final void testCallWhenFalseShardEnd() { @@ -172,7 +182,7 @@ public class ShutdownTaskTest { ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); @@ -183,10 +193,12 @@ public class ShutdownTaskTest { verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardDetector, times(1)).listShards(); + verify(leaseCoordinator).getCurrentlyHeldLease(shardInfo.shardId()); } /** * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that a ShutdownTask is created for the ShardConsumer losing the lease. */ @Test public final void testCallWhenLeaseLost() { @@ -194,7 +206,7 @@ public class ShutdownTaskTest { ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); @@ -205,6 +217,7 @@ public class ShutdownTaskTest { verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardDetector, never()).listShards(); + verify(leaseCoordinator, never()).getAssignments(); } /** diff --git a/pom.xml b/pom.xml index 04bcaf7c..b847f18a 100644 --- a/pom.xml +++ b/pom.xml @@ -63,11 +63,11 @@ ossrh - https://oss.sonatype.org/content/repositories/snapshots + https://aws.oss.sonatype.org/content/repositories/snapshots ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ + https://aws.oss.sonatype.org/service/local/staging/deploy/maven2/ @@ -114,7 +114,7 @@ true sonatype-nexus-staging - https://oss.sonatype.org + https://aws.oss.sonatype.org false