From 1ccbe614eb6005262e81845ec774869b64215472 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Fri, 20 Mar 2020 06:00:26 -0700 Subject: [PATCH] Re-adding multistream tests. --- .../leases/HierarchicalShardSyncerTest.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 0e0c0d29..2b5d562b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -179,6 +179,59 @@ public class HierarchicalShardSyncerTest { assertThat(newLeaseKeys, equalTo(expectedLeaseIds)); } + /** + * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but + * one of the shards was marked as inconsistent. + */ + @Test + public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() { + final String shardId0 = "shardId-0"; + final String shardId1 = "shardId-1"; + final String shardId2 = "shardId-2"; + final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); + + final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), + ShardObjectHelper.newShard(shardId1, null, null, sequenceRange), + ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + final List currentLeases = Collections.emptyList(); + + final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + INITIAL_POSITION_LATEST, inconsistentShardIds); + final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); + assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); + assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); + } + + /** + * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but + * one of the shards was marked as inconsistent. + */ + @Test + public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() { + final String shardId0 = "shardId-0"; + final String shardId1 = "shardId-1"; + final String shardId2 = "shardId-2"; + final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); + + final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), + ShardObjectHelper.newShard(shardId1, null, null, sequenceRange), + ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + final List currentLeases = Collections.emptyList(); + + final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); + final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set expectedLeaseShardIds = new HashSet<>( + toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); + assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); + assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); + } + /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) */ @@ -1045,6 +1098,24 @@ public class HierarchicalShardSyncerTest { } } + @Test + public void testDetermineNewLeasesToCreateIgnoreClosedShard() { + final String lastShardId = "shardId-1"; + final List currentLeases = new ArrayList<>(); + + final List shards = Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, + ShardObjectHelper.newSequenceNumberRange("303", "404")), + ShardObjectHelper.newShard(lastShardId, null, null, + ShardObjectHelper.newSequenceNumberRange("405", null))); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + INITIAL_POSITION_LATEST); + + assertThat(newLeases.size(), equalTo(1)); + assertThat(newLeases.get(0).leaseKey(), equalTo(lastShardId)); + } + // /** // * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) // * Shard structure (each level depicts a stream segment):