diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 792d566a..3405362a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.lifecycle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import com.google.common.collect.ImmutableList; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -194,6 +196,52 @@ public class ShutdownTaskTest { verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); } + @Test + public final void testCallThrowsWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager); + + when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); + Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList()); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")).thenReturn(null, parentLease); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); + when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); + // Return null lease first time to simulate partial parent lease info + when(leaseRefresher.getLease("shardId-1")).thenReturn(null, parentLease); + + // Make first attempt with partial parent info in lease table + TaskResult result = task.call(); + assertNotNull(result.getException()); + assertTrue(result.getException() instanceof InvalidStateException); + assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(recordsPublisher, never()).shutdown(); + verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(leaseCoordinator, never()).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); + verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); + + // make next attempt with complete parent info in lease table + result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); + verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); + + } + /** * Test method for {@link ShutdownTask#call()}. * This test is for the scenario that a ShutdownTask is created for detecting a false Shard End. @@ -269,4 +317,18 @@ public class ShutdownTaskTest { childShards.add(rightChild); return childShards; } + + private List constructChildShard() { + List childShards = new ArrayList<>(); + List parentShards = new ArrayList<>(); + parentShards.add(shardId); + parentShards.add("shardId-1"); + ChildShard leftChild = ChildShard.builder() + .shardId("shardId-2") + .parentShards(parentShards) + .hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49")) + .build(); + childShards.add(leftChild); + return childShards; + } }