diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 2b2df48c..a2700097 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -18,9 +18,11 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -33,6 +35,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; @@ -446,7 +449,7 @@ public class HierarchicalShardSyncer { /** * Note: Package level access for testing purposes only. * Check if this shard is a descendant of a shard that is (or will be) processed. - * Create leases for the ancestors of this shard as required. + * Create leases for the first ancestor of this shard that needs to be processed, as required. * See javadoc of determineNewLeasesToCreate() for rules and example. * * @param shardId The shardId to check. @@ -462,10 +465,10 @@ public class HierarchicalShardSyncer { static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId, final InitialPositionInStreamExtended initialPosition, final Set shardIdsOfCurrentLeases, final Map shardIdToShardMapOfAllKinesisShards, - final Map shardIdToLeaseMapOfNewShards, final Map memoizationContext, + final Map shardIdToLeaseMapOfNewShards, final MemoizationContext memoizationContext, final MultiStreamArgs multiStreamArgs) { final String streamIdentifier = getStreamIdentifier(multiStreamArgs); - final Boolean previousValue = memoizationContext.get(shardId); + final Boolean previousValue = memoizationContext.isDescendant(shardId); if (previousValue != null) { return previousValue; } @@ -480,13 +483,17 @@ public class HierarchicalShardSyncer { // We don't need to add leases of its ancestors, // because we'd have done it when creating a lease for this shard. } else { + final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId); final Set parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards); for (String parentShardId : parentShardIds) { - // Check if the parent is a descendant, and include its ancestors. - if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition, - shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, - memoizationContext, multiStreamArgs)) { + // Check if the parent is a descendant, and include its ancestors. Or, if the parent is NOT a + // descendant but we should create a lease for it anyway (e.g. to include in processing from + // TRIM_HORIZON or AT_TIMESTAMP). If either is true, then we mark the current shard as a descendant. + final boolean isParentDescendant = checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, + initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, + shardIdToLeaseMapOfNewShards, memoizationContext, multiStreamArgs); + if (isParentDescendant || memoizationContext.shouldCreateLease(parentShardId)) { isDescendant = true; descendantParentShardIds.add(parentShardId); log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId); @@ -499,48 +506,87 @@ public class HierarchicalShardSyncer { if (isDescendant) { for (String parentShardId : parentShardIds) { if (!shardIdsOfCurrentLeases.contains(parentShardId)) { - log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId); Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); + + /** + * If the lease for the parent shard does not already exist, there are two cases in which we + * would want to create it: + * - If we have already marked the parentShardId for lease creation in a prior recursive + * call. This could happen if we are trying to process from TRIM_HORIZON or AT_TIMESTAMP. + * - If the parent shard is not a descendant but the current shard is a descendant, then + * the parent shard is the oldest shard in the shard hierarchy that does not have an + * ancestor in the lease table (the adjacent parent is necessarily a descendant, and + * therefore covered in the lease table). So we should create a lease for the parent. + */ if (lease == null) { - lease = multiStreamArgs.isMultiStreamMode() ? - newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId), - multiStreamArgs.streamIdentifier()) : - newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); - shardIdToLeaseMapOfNewShards.put(parentShardId, lease); + if (memoizationContext.shouldCreateLease(parentShardId) || + !descendantParentShardIds.contains(parentShardId)) { + log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId); + lease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId), + multiStreamArgs.streamIdentifier()) : + newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); + shardIdToLeaseMapOfNewShards.put(parentShardId, lease); + } } - if (descendantParentShardIds.contains(parentShardId) - && !initialPosition.getInitialPositionInStream() + /** + * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the + * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will + * add a lease just like we do for TRIM_HORIZON. However we will only return back records + * with server-side timestamp at or after the specified initial position timestamp. + * + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: (4, 5, 7) + * + * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with + * timestamp value 206. We will then create new leases for all the shards 0 and 1 (with + * checkpoint set AT_TIMESTAMP), even though these ancestor shards have an epoch less than + * 206. However as we begin processing the ancestor shards, their checkpoints would be + * updated to SHARD_END and their leases would then be deleted since they won't have records + * with server-side timestamp at/after 206. And after that we will begin processing the + * descendant shards with epoch at/after 206 and we will return the records that meet the + * timestamp requirement for these shards. + */ + if (lease != null) { + if (descendantParentShardIds.contains(parentShardId) + && !initialPosition.getInitialPositionInStream() .equals(InitialPositionInStream.AT_TIMESTAMP)) { - lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { - lease.checkpoint(convertToCheckpoint(initialPosition)); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } else { + lease.checkpoint(convertToCheckpoint(initialPosition)); + } } } } } else { - // This shard should be included, if the customer wants to process all records in the stream or - // if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do - // for TRIM_HORIZON. However we will only return back records with server-side timestamp at or - // after the specified initial position timestamp. + // This shard is not a descendant, but should still be included if the customer wants to process all + // records in the stream or if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a + // lease just like we do for TRIM_HORIZON. However we will only return back records with server-side + // timestamp at or after the specified initial position timestamp. if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) || initialPosition.getInitialPositionInStream() .equals(InitialPositionInStream.AT_TIMESTAMP)) { - isDescendant = true; + memoizationContext.setShouldCreateLease(shardId, true); } } - } } - memoizationContext.put(shardId, isDescendant); + memoizationContext.setIsDescendant(shardId, isDescendant); return isDescendant; } static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId, final InitialPositionInStreamExtended initialPosition, final Set shardIdsOfCurrentLeases, final Map shardIdToShardMapOfAllKinesisShards, - final Map shardIdToLeaseMapOfNewShards, final Map memoizationContext) { + final Map shardIdToLeaseMapOfNewShards, MemoizationContext memoizationContext) { return checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, memoizationContext, new MultiStreamArgs(false, null)); @@ -1033,8 +1079,10 @@ public class HierarchicalShardSyncer { * Note: Package level access only for testing purposes. *

* For each open (no ending sequence number) shard without open parents that doesn't already have a lease, - * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): - * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. + * determine if it is a descendant of any shard which is or will be processed (e.g. for which a lease exists): + * If so, create a lease for the first ancestor that needs to be processed (if needed). We will create leases + * for no more than one level in the ancestry tree. Once we find the first ancestor that needs to be processed, + * we will avoid creating leases for further descendants of that ancestor. * If not, set checkpoint of the shard to the initial position specified by the client. * To check if we need to create leases for ancestors, we use the following rules: * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before @@ -1052,11 +1100,19 @@ public class HierarchicalShardSyncer { * Shard structure (each level depicts a stream segment): * 0 1 2 3 4 5 - shards till epoch 102 * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | / \ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (3, 4, 5) - * New leases to create: (2, 6, 7, 8, 9, 10) + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: (4, 5, 7) + * + * If initial position is LATEST: + * - New leases to create: (6) + * If initial position is TRIM_HORIZON: + * - New leases to create: (0, 1) + * If initial position is AT_TIMESTAMP(epoch=200): + * - New leases to create: (0, 1) + * *

* The leases returned are sorted by the starting sequence number - following the same order * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail @@ -1083,7 +1139,7 @@ public class HierarchicalShardSyncer { .collect(Collectors.toSet()); final List openShards = getOpenShards(shards, streamIdentifier); - final Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); // Iterate over the open shards and find those that don't have any lease entries. for (Shard shard : openShards) { @@ -1094,45 +1150,32 @@ public class HierarchicalShardSyncer { } else if (inconsistentShardIds.contains(shardId)) { log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId); } else { - log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, shardId); - final Lease newLease = multiStreamArgs.isMultiStreamMode() ? - newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : - newKCLLease(shard); + log.debug("{} : Beginning traversal of ancestry tree for shardId {}", streamIdentifier, shardId); + + // A shard is a descendant if at least one if its ancestors exists in the lease table. + // We will create leases for only one level in the ancestry tree. Once we find the first ancestor + // that needs to be processed in order to complete the hash range, we will not create leases for + // further descendants of that ancestor. final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, memoizationContext, multiStreamArgs); - /** - * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the - * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a - * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side - * timestamp at or after the specified initial position timestamp. - * - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * - * Current leases: empty set - * - * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with - * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to - * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin - * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases - * would then be deleted since they won't have records with server-side timestamp at/after 206. And - * after that we will begin processing the descendant shards with epoch at/after 206 and we will - * return the records that meet the timestamp requirement for these shards. - */ - if (isDescendant - && !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { - newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { + // If shard is a descendant, the leases for its ancestors were already created above. Open shards + // that are NOT descendants will not have leases yet, so we create them here. We will not create + // leases for open shards that ARE descendants yet - leases for these shards will be created upon + // SHARD_END of their parents. + if (!isDescendant) { + log.debug("{} : shardId {} has no ancestors. Creating a lease.", streamIdentifier, shardId); + final Lease newLease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : + newKCLLease(shard); newLease.checkpoint(convertToCheckpoint(initialPosition)); + log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint()); + shardIdToNewLeaseMap.put(shardId, newLease); + } else { + log.debug("{} : shardId {} is a descendant whose ancestors should already have leases. " + + "Not creating a lease.", streamIdentifier, shardId); } - log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint()); - shardIdToNewLeaseMap.put(shardId, newLease); } } @@ -1143,4 +1186,29 @@ public class HierarchicalShardSyncer { return newLeasesToCreate; } } + + /** + * Helper class to pass around state between recursive traversals of shard hierarchy. + */ + @NoArgsConstructor + static class MemoizationContext { + private Map isDescendantMap = new HashMap<>(); + private Map shouldCreateLeaseMap = new HashMap<>(); + + Boolean isDescendant(String shardId) { + return isDescendantMap.get(shardId); + } + + void setIsDescendant(String shardId, Boolean isDescendant) { + isDescendantMap.put(shardId, isDescendant); + } + + Boolean shouldCreateLease(String shardId) { + return shouldCreateLeaseMap.computeIfAbsent(shardId, x -> Boolean.FALSE); + } + + void setShouldCreateLease(String shardId, Boolean shouldCreateLease) { + shouldCreateLeaseMap.put(shardId, shouldCreateLease); + } + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index acfa3c51..096bf33a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -20,7 +20,6 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; @@ -34,6 +33,7 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -43,7 +43,6 @@ import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; @@ -71,6 +70,8 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.NullMetricsScope; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.leases.HierarchicalShardSyncer.MemoizationContext; + @RunWith(MockitoJUnitRunner.class) // CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES public class HierarchicalShardSyncerTest { @@ -473,14 +474,180 @@ public class HierarchicalShardSyncerTest { verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: TRIM_HORIZON + * Leases to create: (0, 1, 2, 3, 4, 5) + */ @Test - public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception { - testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON); + public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithEmptyLeaseTable() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0", "shardId-1", "shardId-2", + "shardId-3", "shardId-4", "shardId-5")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); } + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: AT_TIMESTAMP(1000) + * Leases to create: (8, 4, 9, 10) + */ @Test - public void testCheckAndCreateLeasesForNewShardsAtTimestamp() throws Exception { - testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_AT_TIMESTAMP); + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable1() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-8", "shardId-4", "shardId-9", + "shardId-10")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: AT_TIMESTAMP(200) + * Leases to create: (6, 7, 4, 5) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable2() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-6", "shardId-7", "shardId-4", + "shardId-5")); + final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp(new Date(200L)); + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: LATEST + * Leases to create: (8, 4, 9, 10) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-8", "shardId-4", "shardId-9", + "shardId-10")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: TRIM_HORIZON + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithPartialLeaseTable() throws Exception { + final List shards = constructShardListForGraphA(); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from TRIM_HORIZON. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER); + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate, existingLeases); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: AT_TIMESTAMP(1000) + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable1() throws Exception { + final List shards = constructShardListForGraphA(); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from AT_TIMESTAMP. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.AT_TIMESTAMP, LEASE_OWNER); + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate, existingLeases); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: AT_TIMESTAMP(200) + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable2() throws Exception { + final List shards = constructShardListForGraphA(); + final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp(new Date(200L)); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from AT_TIMESTAMP. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.AT_TIMESTAMP, LEASE_OWNER); + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate, existingLeases); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: LATEST + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable() throws Exception { + final List shards = constructShardListForGraphA(); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from LATEST. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, LEASE_OWNER); + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate, existingLeases); } @Test(expected = KinesisClientLibIOException.class) @@ -657,7 +824,7 @@ public class HierarchicalShardSyncerTest { final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); - final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); + final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); assertThat(createLeases, equalTo(expectedCreateLeases)); @@ -703,7 +870,8 @@ public class HierarchicalShardSyncerTest { } private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseExceptions( - final ExtendedSequenceNumber sequenceNumber, final InitialPositionInStreamExtended position) + final ExtendedSequenceNumber sequenceNumber, + final InitialPositionInStreamExtended position) throws Exception { final String shardIdPrefix = "shardId-%d"; final List shards = constructShardListForGraphA(); @@ -732,7 +900,7 @@ public class HierarchicalShardSyncerTest { final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); - final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); + final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); assertThat(createLeases, equalTo(expectedCreateLeases)); @@ -843,7 +1011,7 @@ public class HierarchicalShardSyncerTest { SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); - final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); + final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); assertThat(createLeases, equalTo(expectedCreateLeases)); @@ -933,7 +1101,8 @@ public class HierarchicalShardSyncerTest { SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); - final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); + + final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); assertThat(createLeases, equalTo(expectedCreateLeases)); verify(shardDetector, times(2)).listShards(); @@ -1079,19 +1248,33 @@ public class HierarchicalShardSyncerTest { throws Exception { final String shardId0 = "shardId-0"; final String shardId1 = "shardId-1"; - final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); - final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), - ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + final HashKeyRange range1 = ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, BigInteger.ONE.toString()); + final HashKeyRange range2 = ShardObjectHelper.newHashKeyRange(new BigInteger("2").toString(), ShardObjectHelper.MAX_HASH_KEY); + final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("11", null); + final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, range1), + ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, range2)); + final Set expectedLeaseKeys = new HashSet<>(Arrays.asList(shardId0, shardId1)); - testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition); + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeys); } private void testCheckAndCreateLeaseForShardsIfMissing(final List shards, - final InitialPositionInStreamExtended initialPosition) throws Exception { + final InitialPositionInStreamExtended initialPosition, + final Set expectedLeaseKeys) throws Exception { + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeys, Collections.emptyList()); + } + + private void testCheckAndCreateLeaseForShardsIfMissing(final List shards, + final InitialPositionInStreamExtended initialPosition, + final Set expectedLeaseKeys, + final List existingLeases) throws Exception { + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); when(shardDetector.listShards()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(shardDetector.listShardsWithFilter(any())).thenReturn(getFilteredShards(shards, initialPosition)); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(existingLeases); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(existingLeases.isEmpty()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); hierarchicalShardSyncer @@ -1102,16 +1285,15 @@ public class HierarchicalShardSyncerTest { final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set leaseSequenceNumbers = leases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); - final Set expectedLeaseKeys = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); + final Set expectedSequenceNumbers = new HashSet<>(Collections .singletonList(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().name()))); - assertThat(leases.size(), equalTo(shards.size())); + assertThat(leases.size(), equalTo(expectedLeaseKeys.size())); assertThat(leaseKeys, equalTo(expectedLeaseKeys)); assertThat(leaseSequenceNumbers, equalTo(expectedSequenceNumbers)); - verify(shardDetector).listShards(); - verify(dynamoDBLeaseRefresher, times(shards.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, times(expectedLeaseKeys.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -1177,49 +1359,30 @@ public class HierarchicalShardSyncerTest { assertThat(newLeases.get(0).leaseKey(), equalTo(lastShardId)); } -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (3, 4, 5) -// */ + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * Initial position: LATEST + * Expected leases: (2, 6) + */ @Test - public void testDetermineNewLeasesToCreateSplitMergeLatest1() { + public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange1() { final List shards = constructShardListForGraphA(); - final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), - newLease("shardId-5")); - final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); - final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer - .constructShardIdToChildShardIdsMap(shardIdToShardMap); - - final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = - new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_LATEST); - + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); final Map expectedShardIdCheckpointMap = new HashMap<>(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.LATEST); - expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.TRIM_HORIZON); - - assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size())); - for (Lease lease : newLeases) { - assertThat("Unexpected lease: " + lease, expectedShardIdCheckpointMap.containsKey(lease.leaseKey()), - equalTo(true)); - assertThat(lease.checkpoint(), equalTo(expectedShardIdCheckpointMap.get(lease.leaseKey()))); - } + expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); } /** - * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) + * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): * 0 1 2 3 4 5- shards till epoch 102 * \ / \ / | | @@ -1227,12 +1390,796 @@ public class HierarchicalShardSyncerTest { * \ / | /\ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * Current leases: (4, 5, 7) + * Initial position: LATEST + * Expected leases: (6) + * */ @Test - public void testDetermineNewLeasesToCreateSplitMergeLatest2() { + public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange2() { final List shards = constructShardListForGraphA(); - final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), - newLease("shardId-7")); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (2, 6) + * Initial position: LATEST + * Expected leases: (3, 4, 9, 10) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange3() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 9, 10) + * Initial position: LATEST + * Expected leases: (8) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange4() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is + * epochs): 0 1 2 3 - shards till + * / \ | \ / + * 4 5 1 6 - shards from epoch 103 - 205 + * / \ / \ | | + * 7 8 9 10 1 6 + * shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (9, 10) + * Initial position: LATEST + * Expected leases: (1, 6, 7, 8) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestC_PartialHashRange5() { + final List shards = constructShardListForGraphC(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-9", "shardId-10"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 6, 7) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRange() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 2, 3, 4, 5, 6, 7) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: empty set + * Initial position: LATEST + * Expected leases: (4, 8, 9, 10) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_EmptyLeaseTable() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 4, 7, 9, 10) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeAcrossDifferentEpochs() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", + "shardId-9", "shardId-10"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (6) + * Initial position: LATEST + * Expected leases: (7) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestB_PartialHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-6"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (5) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (0, 1, 2, 3, 4, 5) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: empty set + * Initial position: LATEST + * Expected leases: (9, 10) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestB_EmptyLeaseTable() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1, 2) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange1() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 7) + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange2() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (2, 6) + * Initial position: TRIM_HORIZON + * Expected leases: (3, 4, 5) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange3() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.TRIM_HORIZON); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 9, 10) + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1, 2, 3) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange4() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 6, 7) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRange() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 2, 3, 4, 5, 6, 7) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: empty set + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1, 2, 3, 4, 5) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_EmptyLeaseTable() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.TRIM_HORIZON); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 4, 7, 9, 10) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeAcrossDifferentEpochs() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", + "shardId-9", "shardId-10"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (6) + * Initial position: TRIM_HORIZON + * Expected leases: (7) + */ +// TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases +// @Test +// public void testDetermineNewLeasesToCreateSplitMergeHorizonB_PartialHashRange() { +// final List shards = constructShardListForGraphB(); +// final List shardIdsOfCurrentLeases = Arrays.asList("shardId-6"); +// final Map expectedShardIdCheckpointMap = new HashMap<>(); +// expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.TRIM_HORIZON); +// assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); +// } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (5) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (0, 1, 2, 3, 4, 5) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: empty set + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonB_EmptyLeaseTable() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1, 2) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange1() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 7) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange2() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (2, 6) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (3, 4, 5) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange3() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.AT_TIMESTAMP); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 9, 10) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1, 2, 3) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange4() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 6, 7) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRange() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 2, 3, 4, 5, 6, 7) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: empty set + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1, 2, 3, 4, 5) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_EmptyLeaseTable() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.AT_TIMESTAMP); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 4, 7, 9, 10) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeAcrossDifferentEpochs() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", + "shardId-9", "shardId-10"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (6) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (7) + */ +// TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases +// @Test +// public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_PartialHashRange() { +// final List shards = constructShardListForGraphB(); +// final List shardIdsOfCurrentLeases = Arrays.asList("shardId-6"); +// final Map expectedShardIdCheckpointMap = new HashMap<>(); +// expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.AT_TIMESTAMP); +// assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); +// } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (5) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: (0, 1, 2, 3, 4, 5) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /* + * Shard structure (x-axis is epochs): + * 0 3 6 9 + * \ / \ / \ / + * 2 5 8 + * / \ / \ / \ + * 1 4 7 10 + * + * Current leases: empty set + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_EmptyLeaseTable() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + private void assertExpectedLeasesAreCreated(List shards, + List shardIdsOfCurrentLeases, + InitialPositionInStreamExtended initialPosition, + Map expectedShardIdCheckpointMap) { + + final List currentLeases = shardIdsOfCurrentLeases.stream() + .map(shardId -> newLease(shardId)).collect(Collectors.toList()); final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer @@ -1242,13 +2189,7 @@ public class HierarchicalShardSyncerTest { new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_LATEST); - - final Map expectedShardIdCheckpointMap = new HashMap<>(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); + shards, currentLeases, initialPosition); assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size())); for (Lease lease : newLeases) { @@ -1258,243 +2199,6 @@ public class HierarchicalShardSyncerTest { } } -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position TrimHorizon) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (3, 4, 5) -// */ - @Test - public void testDetermineNewLeasesToCreateSplitMergeHorizon1() { - final List shards = constructShardListForGraphA(); - final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), - newLease("shardId-5")); - - final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); - final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer - .constructShardIdToChildShardIdsMap(shardIdToShardMap); - - final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = - new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); - - final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final List checkpoints = newLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toList()); - final Set checkpoint = new HashSet<>(checkpoints); - - final Set expectedLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-1", "shardId-2", - "shardId-6", "shardId-7", "shardId-8", "shardId-9", "shardId-10")); - final Set expectedCheckpoint = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.TRIM_HORIZON)); - - assertThat(newLeases.size(), equalTo(expectedLeaseKeys.size())); - assertThat(checkpoints.size(), equalTo(expectedLeaseKeys.size())); - assertThat(leaseKeys, equalTo(expectedLeaseKeys)); - assertThat(checkpoint, equalTo(expectedCheckpoint)); - } - -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position TrimHorizon) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (4, 5, 7) -// */ - @Test - public void testDetermineNewLeasesToCreateSplitMergeHorizon2() { - final List shards = constructShardListForGraphA(); - final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), - newLease("shardId-7")); - - final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); - final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer - .constructShardIdToChildShardIdsMap(shardIdToShardMap); - - final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = - new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); - - final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final List checkpoints = newLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toList()); - final Set checkpoint = new HashSet<>(checkpoints); - - final Set expectedLeaseKeys = new HashSet<>( - Arrays.asList("shardId-8", "shardId-9", "shardId-10", "shardId-6", "shardId-0", "shardId-1")); - final Set expectedCheckpoint = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.TRIM_HORIZON)); - - assertThat(newLeases.size(), equalTo(expectedLeaseKeys.size())); - assertThat(checkpoints.size(), equalTo(expectedLeaseKeys.size())); - assertThat(leaseKeys, equalTo(expectedLeaseKeys)); - assertThat(checkpoint, equalTo(expectedCheckpoint)); - } - -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position TrimHorizon) -// * For shard graph B (see the construct method doc for structure). -// * -// * Current leases: empty set -// */ - @Test - public void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() { - final List shards = constructShardListForGraphB(); - final List currentLeases = new ArrayList<>(); - - final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); - final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer - .constructShardIdToChildShardIdsMap(shardIdToShardMap); - - final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = - new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); - - final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final List checkpoints = newLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toList()); - final Set checkpoint = new HashSet<>(checkpoints); - - final Set expectedCheckpoint = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.TRIM_HORIZON)); - final Set expectedLeaseKeys = IntStream.range(0, 11).mapToObj(id -> String.format("shardId-%d", id)) - .collect(Collectors.toSet()); - - assertThat(newLeases.size(), equalTo(expectedLeaseKeys.size())); - assertThat(checkpoints.size(), equalTo(expectedLeaseKeys.size())); - assertThat(leaseKeys, equalTo(expectedLeaseKeys)); - assertThat(checkpoint, equalTo(expectedCheckpoint)); - } - -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (3, 4, 5) -// */ - @Test - public void testDetermineNewLeasesToCreateSplitMergeAtTimestamp1() { - final List shards = constructShardListForGraphA(); - final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), - newLease("shardId-5")); - - final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); - final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer - .constructShardIdToChildShardIdsMap(shardIdToShardMap); - - final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = - new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); - final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final List checkpoints = newLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toList()); - final Set checkpoint = new HashSet<>(checkpoints); - - final Set expectedLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-1", "shardId-2", - "shardId-6", "shardId-7", "shardId-8", "shardId-9", "shardId-10")); - final Set expectedCheckpoint = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.AT_TIMESTAMP)); - - assertThat(newLeases.size(), equalTo(expectedLeaseKeys.size())); - assertThat(checkpoints.size(), equalTo(expectedLeaseKeys.size())); - assertThat(leaseKeys, equalTo(expectedLeaseKeys)); - assertThat(checkpoint, equalTo(expectedCheckpoint)); - } - -// /** -// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) -// * Shard structure (each level depicts a stream segment): -// * 0 1 2 3 4 5- shards till epoch 102 -// * \ / \ / | | -// * 6 7 4 5- shards from epoch 103 - 205 -// * \ / | /\ -// * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) -// * Current leases: (4, 5, 7) -// */ - @Test - public void testDetermineNewLeasesToCreateSplitMergeAtTimestamp2() { - final List shards = constructShardListForGraphA(); - final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), - newLease("shardId-7")); - - final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); - final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer - .constructShardIdToChildShardIdsMap(shardIdToShardMap); - - final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = - new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); - final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final List checkpoints = newLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toList()); - final Set checkpoint = new HashSet<>(checkpoints); - - final Set expectedLeaseKeys = new HashSet<>( - Arrays.asList("shardId-0", "shardId-1", "shardId-6", "shardId-8", "shardId-9", "shardId-10")); - final Set expectedCheckpoint = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.AT_TIMESTAMP)); - - assertThat(newLeases.size(), equalTo(expectedLeaseKeys.size())); - assertThat(checkpoints.size(), equalTo(expectedLeaseKeys.size())); - assertThat(leaseKeys, equalTo(expectedLeaseKeys)); - assertThat(checkpoint, equalTo(expectedCheckpoint)); - } - - /** - * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) For shard graph B (see the - * construct method doc for structure). Current leases: empty set - */ - @Test - public void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() { - final List shards = constructShardListForGraphB(); - final List currentLeases = new ArrayList<>(); - - final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); - final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer - .constructShardIdToChildShardIdsMap(shardIdToShardMap); - - final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = - new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, - shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); - final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final List checkpoints = newLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toList()); - final Set checkpoint = new HashSet<>(checkpoints); - - final Set expectedLeaseKeys = IntStream.range(0, shards.size()) - .mapToObj(id -> String.format("shardId-%d", id)).collect(Collectors.toSet()); - final Set expectedCheckpoint = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.AT_TIMESTAMP)); - - assertThat(newLeases.size(), equalTo(expectedLeaseKeys.size())); - assertThat(checkpoints.size(), equalTo(expectedLeaseKeys.size())); - assertThat(leaseKeys, equalTo(expectedLeaseKeys)); - assertThat(checkpoint, equalTo(expectedCheckpoint)); - } - /* * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is * epochs): 0 1 2 3 4 5- shards till @@ -1536,6 +2240,57 @@ public class HierarchicalShardSyncerTest { ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY))); } + /** + * Helper method to mimic behavior of Kinesis ListShardsWithFilter calls. + */ + private static List getFilteredShards(List shards, InitialPositionInStreamExtended initialPosition) { + switch (initialPosition.getInitialPositionInStream()) { + case LATEST: + return shards.stream() + .filter(s -> s.sequenceNumberRange().endingSequenceNumber() == null) + .collect(Collectors.toList()); + case TRIM_HORIZON: + String minSeqNum = shards.stream() + .min(Comparator.comparingLong(s -> Long.parseLong(s.sequenceNumberRange().startingSequenceNumber()))) + .map(s -> s.sequenceNumberRange().startingSequenceNumber()) + .orElseThrow(RuntimeException::new); + return shards.stream() + .filter(s -> s.sequenceNumberRange().startingSequenceNumber().equals(minSeqNum)) + .collect(Collectors.toList()); + case AT_TIMESTAMP: + return shards.stream() + .filter(s -> new Date(Long.parseLong(s.sequenceNumberRange().startingSequenceNumber())) + .compareTo(initialPosition.getTimestamp()) <= 0) + .filter(s -> s.sequenceNumberRange().endingSequenceNumber() == null || + new Date(Long.parseLong(s.sequenceNumberRange().endingSequenceNumber())) + .compareTo(initialPosition.getTimestamp()) > 0) + .collect(Collectors.toList()); + } + throw new RuntimeException("Unsupported initial position " + initialPosition); + } + + /* + * Helper method to get expected shards for Graph A based on initial position in stream. Shard structure (y-axis is + * epochs): 0 1 2 3 4 5- shards till + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - + * shards from epoch 206 (open - no ending sequenceNumber) + */ + private Set getExpectedLeasesForGraphA(List shards, + ExtendedSequenceNumber sequenceNumber, + InitialPositionInStreamExtended initialPosition) { + final List filteredShards; + if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { + // Lease creation for AT_TIMESTAMP should work the same as for TRIM_HORIZON - ignore shard filters + filteredShards = getFilteredShards(shards, INITIAL_POSITION_TRIM_HORIZON); + } else { + filteredShards = getFilteredShards(shards, initialPosition); + } + return new HashSet<>(createLeasesFromShards(filteredShards, sequenceNumber, null)); + } + // /* // * Helper method to construct a shard list for graph B. Graph B is defined below. // * Shard structure (x-axis is epochs): @@ -1571,12 +2326,53 @@ public class HierarchicalShardSyncerTest { ShardObjectHelper.newShard("shardId-10", null, "shardId-8", range6, hashRange1)); } + /** + * Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is + * epochs): 0 1 2 3 - shards till + * / \ | \ / + * 4 5 1 6 - shards from epoch 103 - 205 + * / \ / \ | | + * 7 8 9 10 1 6 + * shards from epoch 206 (open - no ending sequenceNumber) + */ + private List constructShardListForGraphC() { + final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); + final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null); + final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("103", null); + final SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "205"); + final SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("206", null); + + return Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, range0, + ShardObjectHelper.newHashKeyRange("0", "399")), + ShardObjectHelper.newShard("shardId-1", null, null, range1, + ShardObjectHelper.newHashKeyRange("400", "499")), + ShardObjectHelper.newShard("shardId-2", null, null, range0, + ShardObjectHelper.newHashKeyRange("500", "599")), + ShardObjectHelper.newShard("shardId-3", null, null, range0, + ShardObjectHelper.newHashKeyRange("600", ShardObjectHelper.MAX_HASH_KEY)), + ShardObjectHelper.newShard("shardId-4", "shardId-0", null, range3, + ShardObjectHelper.newHashKeyRange("0", "199")), + ShardObjectHelper.newShard("shardId-5", "shardId-0", null, range3, + ShardObjectHelper.newHashKeyRange("200", "399")), + ShardObjectHelper.newShard("shardId-6", "shardId-2", "shardId-3", range2, + ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY)), + ShardObjectHelper.newShard("shardId-7", "shardId-4", null, range4, + ShardObjectHelper.newHashKeyRange("0", "99")), + ShardObjectHelper.newShard("shardId-8", "shardId-4", null, range4, + ShardObjectHelper.newHashKeyRange("100", "199")), + ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4, + ShardObjectHelper.newHashKeyRange("200", "299")), + ShardObjectHelper.newShard("shardId-10", "shardId-5", null, range4, + ShardObjectHelper.newHashKeyRange("300", "399"))); + } + /** * Test CheckIfDescendantAndAddNewLeasesForAncestors when shardId is null */ @Test public void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() { - final Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); assertThat(HierarchicalShardSyncer .checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null, @@ -1589,7 +2385,7 @@ public class HierarchicalShardSyncerTest { @Test public void testCheckIfDescendantAndAddNewLeasesForAncestorsTrimmedShard() { final String shardId = "shardId-trimmed"; - final Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); assertThat(HierarchicalShardSyncer .checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null, @@ -1604,7 +2400,7 @@ public class HierarchicalShardSyncerTest { final String shardId = "shardId-current"; final Set shardIdsOfCurrentLeases = new HashSet<>(Collections.singletonList(shardId)); final Map newLeaseMap = Collections.emptyMap(); - final Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); final Map kinesisShards = new HashMap<>(); kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, null, null, null)); @@ -1624,7 +2420,7 @@ public class HierarchicalShardSyncerTest { final String shardId = "shardId-9-1"; final Set shardIdsOfCurrentLeases = Collections.emptySet(); final Map newLeaseMap = Collections.emptyMap(); - final Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); final Map kinesisShards = new HashMap<>(); kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null)); @@ -2313,5 +3109,4 @@ public class HierarchicalShardSyncerTest { return lease; } - }