From af46774a11736bd73da1474e7cfaab8ba6f29019 Mon Sep 17 00:00:00 2001 From: Shitanshu Aggarwal Date: Mon, 21 Oct 2019 02:58:50 +0000 Subject: [PATCH] adding test to verify that worker shutdown does not block on completed leases --- .../clientlibrary/lib/worker/WorkerTest.java | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 23c91269..438215f1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -53,7 +52,6 @@ import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -1100,6 +1098,63 @@ public class WorkerTest { } + private String randomShardId() { + return UUID.randomUUID().toString(); + } + + @Test + public void testShutdownDoesNotBlockOnCompletedLeases() throws Exception { + final String shardId = randomShardId(); + final String parentShardId = randomShardId(); + final KinesisClientLease completedLease = mock(KinesisClientLease.class); + when(completedLease.getLeaseKey()).thenReturn(shardId); + when(completedLease.getParentShardIds()).thenReturn(Collections.singleton(parentShardId)); + when(completedLease.getCheckpoint()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(completedLease.getConcurrencyToken()).thenReturn(UUID.randomUUID()); + final StreamConfig streamConfig = mock(StreamConfig.class); + final IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + final List leases = Collections.singletonList(completedLease); + final List currentAssignments = new ArrayList<>(); + + when(leaseCoordinator.getAssignments()).thenAnswer((Answer>) invocation -> leases); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer((Answer>) invocation -> currentAssignments); + + final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new Worker("testShutdownWithCompletedLeases", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); + + final Map shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap(); + final ShardInfo completedShardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(completedLease); + final ShardConsumer completedShardConsumer = mock(ShardConsumer.class); + shardInfoShardConsumerMap.put(completedShardInfo, completedShardConsumer); + when(completedShardConsumer.getCurrentState()).thenReturn(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE); + + Callable callable = worker.createWorkerShutdownCallable(); + assertThat(worker.hasGracefulShutdownStarted(), equalTo(false)); + + GracefulShutdownContext gracefulShutdownContext = callable.call(); + assertThat(gracefulShutdownContext.getShutdownCompleteLatch().getCount(), equalTo(0L)); + assertThat(gracefulShutdownContext.getNotificationCompleteLatch().getCount(), equalTo(0L)); + assertThat(worker.hasGracefulShutdownStarted(), equalTo(true)); + } + @Test public void testRequestShutdownWithLostLease() throws Exception {