adding test to verify that worker shutdown does not block on completed leases

This commit is contained in:
Shitanshu Aggarwal 2019-10-21 02:58:50 +00:00
parent 17cff739c2
commit af46774a11

View file

@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@ -53,7 +52,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
@ -1100,6 +1098,63 @@ public class WorkerTest {
}
private String randomShardId() {
return UUID.randomUUID().toString();
}
@Test
public void testShutdownDoesNotBlockOnCompletedLeases() throws Exception {
final String shardId = randomShardId();
final String parentShardId = randomShardId();
final KinesisClientLease completedLease = mock(KinesisClientLease.class);
when(completedLease.getLeaseKey()).thenReturn(shardId);
when(completedLease.getParentShardIds()).thenReturn(Collections.singleton(parentShardId));
when(completedLease.getCheckpoint()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(completedLease.getConcurrencyToken()).thenReturn(UUID.randomUUID());
final StreamConfig streamConfig = mock(StreamConfig.class);
final IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
final List<KinesisClientLease> leases = Collections.singletonList(completedLease);
final List<ShardInfo> currentAssignments = new ArrayList<>();
when(leaseCoordinator.getAssignments()).thenAnswer((Answer<List<KinesisClientLease>>) invocation -> leases);
when(leaseCoordinator.getCurrentAssignments()).thenAnswer((Answer<List<ShardInfo>>) invocation -> currentAssignments);
final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testShutdownWithCompletedLeases",
recordProcessorFactory,
config,
streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
final Map<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap();
final ShardInfo completedShardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(completedLease);
final ShardConsumer completedShardConsumer = mock(ShardConsumer.class);
shardInfoShardConsumerMap.put(completedShardInfo, completedShardConsumer);
when(completedShardConsumer.getCurrentState()).thenReturn(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE);
Callable<GracefulShutdownContext> callable = worker.createWorkerShutdownCallable();
assertThat(worker.hasGracefulShutdownStarted(), equalTo(false));
GracefulShutdownContext gracefulShutdownContext = callable.call();
assertThat(gracefulShutdownContext.getShutdownCompleteLatch().getCount(), equalTo(0L));
assertThat(gracefulShutdownContext.getNotificationCompleteLatch().getCount(), equalTo(0L));
assertThat(worker.hasGracefulShutdownStarted(), equalTo(true));
}
@Test
public void testRequestShutdownWithLostLease() throws Exception {