Added unit test cases
This commit is contained in:
parent
ff1bee5791
commit
d9f5557ff5
1 changed files with 62 additions and 0 deletions
|
|
@ -17,6 +17,7 @@ package software.amazon.kinesis.lifecycle;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
|
|
@ -30,6 +31,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
|
@ -194,6 +196,52 @@ public class ShutdownTaskTest {
|
|||
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testCallThrowsWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager);
|
||||
|
||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
|
||||
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
|
||||
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")).thenReturn(null, parentLease);
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
|
||||
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
|
||||
// Return null lease first time to simulate partial parent lease info
|
||||
when(leaseRefresher.getLease("shardId-1")).thenReturn(null, parentLease);
|
||||
|
||||
// Make first attempt with partial parent info in lease table
|
||||
TaskResult result = task.call();
|
||||
assertNotNull(result.getException());
|
||||
assertTrue(result.getException() instanceof InvalidStateException);
|
||||
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
|
||||
verify(recordsPublisher, never()).shutdown();
|
||||
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||
verify(leaseCoordinator, never()).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
||||
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||
|
||||
// make next attempt with complete parent info in lease table
|
||||
result = task.call();
|
||||
assertNull(result.getException());
|
||||
verify(recordsPublisher).shutdown();
|
||||
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
||||
verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link ShutdownTask#call()}.
|
||||
* This test is for the scenario that a ShutdownTask is created for detecting a false Shard End.
|
||||
|
|
@ -269,4 +317,18 @@ public class ShutdownTaskTest {
|
|||
childShards.add(rightChild);
|
||||
return childShards;
|
||||
}
|
||||
|
||||
private List<ChildShard> constructChildShard() {
|
||||
List<ChildShard> childShards = new ArrayList<>();
|
||||
List<String> parentShards = new ArrayList<>();
|
||||
parentShards.add(shardId);
|
||||
parentShards.add("shardId-1");
|
||||
ChildShard leftChild = ChildShard.builder()
|
||||
.shardId("shardId-2")
|
||||
.parentShards(parentShards)
|
||||
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"))
|
||||
.build();
|
||||
childShards.add(leftChild);
|
||||
return childShards;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue