Adding unit tests for bootstrapping cases.

This commit is contained in:
Joshua Kim 2020-03-24 06:36:36 -04:00
parent 039dd176f8
commit 16cf142b44

View file

@ -21,6 +21,7 @@ package software.amazon.kinesis.leases;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.math.BigInteger; import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -54,6 +56,8 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
@ -1608,6 +1612,62 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseMap.isEmpty(), equalTo(true)); assertThat(newLeaseMap.isEmpty(), equalTo(true));
} }
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception {
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter);
}
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception {
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter);
}
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception {
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(new Date(1000L).toInstant()).build();
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter);
}
public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, ShardFilter shardFilter) throws Exception {
final String shardId0 = "shardId-0";
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", null)));
final List<Lease> currentLeases = Collections.emptyList();
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter);
verify(shardDetector, never()).listShards();
}
@Test
public void testNonEmptyLeaseTableUsesListShards() throws Exception {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", "2")));
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, ShardObjectHelper.newSequenceNumberRange("3", "4")));
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
when(shardDetector.listShards()).thenReturn(shardsWithoutLeases);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
verify(shardDetector, atLeast(1)).listShards();
}
// /** // /**
// * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent. // * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent.
// */ // */