From 07606883758f50cab5e816a1b88df1c5a81b560e Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 6 Jul 2020 14:50:21 -0400 Subject: [PATCH] KinesisShardSyncer only create leases for one level of leases (#49) Co-authored-by: Joshua Kim --- .../lib/worker/KinesisShardSyncer.java | 110 +- .../NonEmptyLeaseTableSynchronizer.java | 73 +- .../worker/ExceptionThrowingLeaseManager.java | 2 +- .../lib/worker/ShardSyncerTest.java | 1514 ++++++++++++++--- 4 files changed, 1376 insertions(+), 323 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 21890663..358c8136 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -31,6 +31,7 @@ import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilterType; import com.amazonaws.util.CollectionUtils; +import lombok.NoArgsConstructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.lang3.StringUtils; @@ -457,7 +458,7 @@ class KinesisShardSyncer implements ShardSyncer { /** * Note: Package level access for testing purposes only. * Check if this shard is a descendant of a shard that is (or will be) processed. - * Create leases for the ancestors of this shard as required. + * Create leases for the first ancestor of this shard that needs to be processed, as required. * See javadoc of determineNewLeasesToCreate() for rules and example. * * @param shardId The shardId to check. @@ -473,9 +474,10 @@ class KinesisShardSyncer implements ShardSyncer { static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, InitialPositionInStreamExtended initialPosition, Set shardIdsOfCurrentLeases, Map shardIdToShardMapOfAllKinesisShards, - Map shardIdToLeaseMapOfNewShards, Map memoizationContext) { + Map shardIdToLeaseMapOfNewShards, MemoizationContext memoizationContext) { + + final Boolean previousValue = memoizationContext.isDescendant(shardId); - Boolean previousValue = memoizationContext.get(shardId); if (previousValue != null) { return previousValue; } @@ -495,10 +497,13 @@ class KinesisShardSyncer implements ShardSyncer { shard = shardIdToShardMapOfAllKinesisShards.get(shardId); parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards); for (String parentShardId : parentShardIds) { - // Check if the parent is a descendant, and include its ancestors. - if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition, - shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, - memoizationContext)) { + // Check if the parent is a descendant, and include its ancestors. Or, if the parent is NOT a + // descendant but we should create a lease for it anyway (e.g. to include in processing from + // TRIM_HORIZON or AT_TIMESTAMP). If either is true, then we mark the current shard as a descendant. + final boolean isParentDescendant = checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, + initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, + shardIdToLeaseMapOfNewShards, memoizationContext); + if (isParentDescendant || memoizationContext.shouldCreateLease(parentShardId)) { isDescendant = true; descendantParentShardIds.add(parentShardId); LOG.debug("Parent shard " + parentShardId + " is a descendant."); @@ -511,37 +516,76 @@ class KinesisShardSyncer implements ShardSyncer { if (isDescendant) { for (String parentShardId : parentShardIds) { if (!shardIdsOfCurrentLeases.contains(parentShardId)) { - LOG.debug("Need to create a lease for shardId " + parentShardId); KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); + + // If the lease for the parent shard does not already exist, there are two cases in which we + // would want to create it: + // - If we have already marked the parentShardId for lease creation in a prior recursive + // call. This could happen if we are trying to process from TRIM_HORIZON or AT_TIMESTAMP. + // - If the parent shard is not a descendant but the current shard is a descendant, then + // the parent shard is the oldest shard in the shard hierarchy that does not have an + // ancestor in the lease table (the adjacent parent is necessarily a descendant, and + // therefore covered in the lease table). So we should create a lease for the parent. + if (lease == null) { - lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); - shardIdToLeaseMapOfNewShards.put(parentShardId, lease); + if (memoizationContext.shouldCreateLease(parentShardId) || + !descendantParentShardIds.contains(parentShardId)) { + LOG.debug("Need to create a lease for shardId " + parentShardId); + lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); + shardIdToLeaseMapOfNewShards.put(parentShardId, lease); + } } - if (descendantParentShardIds.contains(parentShardId) && !initialPosition - .getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { - lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { - lease.setCheckpoint(convertToCheckpoint(initialPosition)); + /** + * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the + * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will + * add a lease just like we do for TRIM_HORIZON. However we will only return back records + * with server-side timestamp at or after the specified initial position timestamp. + * + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: (4, 5, 7) + * + * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with + * timestamp value 206. We will then create new leases for all the shards 0 and 1 (with + * checkpoint set AT_TIMESTAMP), even though these ancestor shards have an epoch less than + * 206. However as we begin processing the ancestor shards, their checkpoints would be + * updated to SHARD_END and their leases would then be deleted since they won't have records + * with server-side timestamp at/after 206. And after that we will begin processing the + * descendant shards with epoch at/after 206 and we will return the records that meet the + * timestamp requirement for these shards. + */ + if (lease != null) { + if (descendantParentShardIds.contains(parentShardId) && !initialPosition + .getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } else { + lease.setCheckpoint(convertToCheckpoint(initialPosition)); + } } } } } else { - // This shard should be included, if the customer wants to process all records in the stream or - // if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do - // for TRIM_HORIZON. However we will only return back records with server-side timestamp at or - // after the specified initial position timestamp. + // This shard is not a descendant, but should still be included if the customer wants to process all + // records in the stream or if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a + // lease just like we do for TRIM_HORIZON. However we will only return back records with server-side + // timestamp at or after the specified initial position timestamp. if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) || initialPosition.getInitialPositionInStream() .equals(InitialPositionInStream.AT_TIMESTAMP)) { - isDescendant = true; + memoizationContext.setShouldCreateLease(shardId, true); } } } } - memoizationContext.put(shardId, isDescendant); + memoizationContext.setIsDescendant(shardId, isDescendant); return isDescendant; } // CHECKSTYLE:ON CyclomaticComplexity @@ -834,4 +878,28 @@ class KinesisShardSyncer implements ShardSyncer { } + /** + * Helper class to pass around state between recursive traversals of shard hierarchy. + */ + @NoArgsConstructor + static class MemoizationContext { + private Map isDescendantMap = new HashMap<>(); + private Map shouldCreateLeaseMap = new HashMap<>(); + + Boolean isDescendant(String shardId) { + return isDescendantMap.get(shardId); + } + + void setIsDescendant(String shardId, Boolean isDescendant) { + isDescendantMap.put(shardId, isDescendant); + } + + Boolean shouldCreateLease(String shardId) { + return shouldCreateLeaseMap.computeIfAbsent(shardId, x -> Boolean.FALSE); + } + + void setShouldCreateLease(String shardId, Boolean shouldCreateLease) { + shouldCreateLeaseMap.put(shardId, shouldCreateLease); + } + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java index 53c42980..2a868d30 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NonEmptyLeaseTableSynchronizer.java @@ -47,8 +47,10 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { * Note: Package level access only for testing purposes. * * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, - * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): - * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. + * determine if it is a descendant of any shard which is or will be processed (e.g. for which a lease exists): + * If so, create a lease for the first ancestor that needs to be processed (if needed). We will create leases + * for no more than one level in the ancestry tree. Once we find the first ancestor that needs to be processed, + * we will avoid creating leases for further descendants of that ancestor. * If not, set checkpoint of the shard to the initial position specified by the client. * To check if we need to create leases for ancestors, we use the following rules: * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before @@ -67,10 +69,17 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { * 0 1 2 3 4 5 - shards till epoch 102 * \ / \ / | | * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | / \ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (3, 4, 5) - * New leases to create: (2, 6, 7, 8, 9, 10) + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: (4, 5, 7) + * + * If initial position is LATEST: + * - New leases to create: (6) + * If initial position is TRIM_HORIZON: + * - New leases to create: (0, 1) + * If initial position is AT_TIMESTAMP(epoch=200): + * - New leases to create: (0, 1) * * The leases returned are sorted by the starting sequence number - following the same order * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail @@ -104,7 +113,8 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { } List openShards = KinesisShardSyncer.getOpenShards(shards); - Map memoizationContext = new HashMap<>(); + final KinesisShardSyncer.MemoizationContext memoizationContext = new KinesisShardSyncer.MemoizationContext(); + // Iterate over the open shards and find those that don't have any lease entries. for (Shard shard : openShards) { @@ -115,43 +125,30 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { } else if (inconsistentShardIds.contains(shardId)) { LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); } else { - LOG.debug("Need to create a lease for shardId " + shardId); - KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard); + LOG.debug("Beginning traversal of ancestry tree for shardId " + shardId); + + // A shard is a descendant if at least one if its ancestors exists in the lease table. + // We will create leases for only one level in the ancestry tree. Once we find the first ancestor + // that needs to be processed in order to complete the hash range, we will not create leases for + // further descendants of that ancestor. boolean isDescendant = KinesisShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, memoizationContext); - /** - * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the - * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a - * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side - * timestamp at or after the specified initial position timestamp. - * - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * - * Current leases: empty set - * - * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with - * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to - * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin - * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases - * would then be deleted since they won't have records with server-side timestamp at/after 206. And - * after that we will begin processing the descendant shards with epoch at/after 206 and we will - * return the records that meet the timestamp requirement for these shards. - */ - if (isDescendant && !initialPosition.getInitialPositionInStream() - .equals(InitialPositionInStream.AT_TIMESTAMP)) { - newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { + // If shard is a descendant, the leases for its ancestors were already created above. Open shards + // that are NOT descendants will not have leases yet, so we create them here. We will not create + // leases for open shards that ARE descendants yet - leases for these shards will be created upon + // SHARD_END of their parents. + if (!isDescendant) { + LOG.debug("ShardId " + shardId + " has no ancestors. Creating a lease."); + final KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard); newLease.setCheckpoint(KinesisShardSyncer.convertToCheckpoint(initialPosition)); + LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint()); + shardIdToNewLeaseMap.put(shardId, newLease); + } else { + LOG.debug("ShardId " + shardId + " is a descendant whose ancestors should already have leases. " + + "Not creating a lease."); } - LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint()); - shardIdToNewLeaseMap.put(shardId, newLease); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java index e7b6c285..7f53133b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java @@ -225,7 +225,7 @@ class ExceptionThrowingLeaseManager implements ILeaseManager @Override public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return false; + return leaseManager.listLeases().isEmpty(); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 48d71f6d..208d6448 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -18,6 +18,9 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -29,8 +32,10 @@ import java.util.stream.Stream; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; +import com.amazonaws.services.kinesis.leases.impl.Lease; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilterType; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; @@ -41,6 +46,7 @@ import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.MemoizationContext; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; @@ -58,10 +64,14 @@ import com.amazonaws.services.kinesis.model.Shard; import junit.framework.Assert; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * @@ -75,6 +85,7 @@ public class ShardSyncerTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L)); + private static String LEASE_OWNER = "leaseOwner"; private final boolean cleanupLeasesOfCompletedShards = true; private static final int EXPONENT = 128; AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); @@ -236,7 +247,7 @@ public class ShardSyncerTest { * All open and closed shards within stream's retention period should be sync'ed when lease table is empty. */ @Test - public final void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable() + public final void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable1() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { List shards = constructShardListForGraphA(); @@ -263,7 +274,7 @@ public class ShardSyncerTest { * We should only create leases for shards at LATEST when lease table is not empty. */ @Test - public final void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable() + public final void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable1() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { List shards = constructShardListForGraphA(); @@ -431,8 +442,11 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5" + )); testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null, - Integer.MAX_VALUE, INITIAL_POSITION_TRIM_HORIZON); + Integer.MAX_VALUE, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); } /** @@ -446,12 +460,16 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithDeleteLeaseExceptions() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5" + )); // Define the max calling count for lease manager methods. // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 10; for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( - ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON); + ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } @@ -468,12 +486,15 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithListLeasesExceptions() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5" + )); // Define the max calling count for lease manager methods. // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 10; for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( - ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON); + ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } @@ -490,12 +511,15 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithCreateLeaseExceptions() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5" + )); // Define the max calling count for lease manager methods. // From the Shard Graph, the max count of calling could be 10 - int maxCallingCount = 5; + int maxCallingCount = 1; for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( - ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON); + ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } @@ -549,8 +573,11 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShard() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-8", "shardId-4", "shardId-9", "shardId-10" + )); testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null, - Integer.MAX_VALUE, INITIAL_POSITION_AT_TIMESTAMP); + Integer.MAX_VALUE, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); } /** @@ -564,13 +591,16 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithDeleteLeaseExceptions() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-8", "shardId-4", "shardId-9", "shardId-10" + )); // Define the max calling count for lease manager methods. // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 10; for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.DELETELEASE, - c, INITIAL_POSITION_AT_TIMESTAMP); + c, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } @@ -587,13 +617,16 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithListLeasesExceptions() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-8", "shardId-4", "shardId-9", "shardId-10" + )); // Define the max calling count for lease manager methods. // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 10; for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.LISTLEASES, - c, INITIAL_POSITION_AT_TIMESTAMP); + c, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } @@ -610,13 +643,16 @@ public class ShardSyncerTest { public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithCreateLeaseExceptions() throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList( + "shardId-8", "shardId-4", "shardId-9", "shardId-10" + )); // Define the max calling count for lease manager methods. // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 5; for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, - c, INITIAL_POSITION_AT_TIMESTAMP); + c, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } @@ -626,7 +662,7 @@ public class ShardSyncerTest { private void testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods exceptionMethod, int exceptionTime, - InitialPositionInStreamExtended position) + InitialPositionInStreamExtended position, Set expectedLeaseKeysToCreate) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { ExtendedSequenceNumber extendedSequenceNumber = @@ -634,35 +670,17 @@ public class ShardSyncerTest { List shards = constructShardListForGraphA(); File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); dataFile.deleteOnExit(); - IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + final IKinesisProxy kinesisProxy = spy(new KinesisLocalFileProxy(dataFile.getAbsolutePath())); + when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.getShardListWithFilter(any())).thenReturn(getFilteredShards(shards, position)); retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position); List newLeases = leaseManager.listLeases(); Map expectedShardIdToCheckpointMap = new HashMap(); - for (int i = 0; i < 11; i++) { - expectedShardIdToCheckpointMap.put("shardId-" + i, extendedSequenceNumber); - } - Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease1 : newLeases) { - ExtendedSequenceNumber expectedCheckpoint = expectedShardIdToCheckpointMap.get(lease1.getLeaseKey()); - Assert.assertNotNull(expectedCheckpoint); - Assert.assertEquals(expectedCheckpoint, lease1.getCheckpoint()); - } + expectedLeaseKeysToCreate.forEach(l -> expectedShardIdToCheckpointMap.put(l, extendedSequenceNumber)); - KinesisClientLease closedShardLease = leaseManager.getLease("shardId-0"); - closedShardLease.setCheckpoint(ExtendedSequenceNumber.SHARD_END); - leaseManager.updateLease(closedShardLease); - expectedShardIdToCheckpointMap.remove(closedShardLease.getLeaseKey()); - KinesisClientLease childShardLease = leaseManager.getLease("shardId-6"); - childShardLease.setCheckpoint(new ExtendedSequenceNumber("34290")); - leaseManager.updateLease(childShardLease); - expectedShardIdToCheckpointMap.put(childShardLease.getLeaseKey(), new ExtendedSequenceNumber("34290")); - - retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position); - - newLeases = leaseManager.listLeases(); Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size()); for (KinesisClientLease lease1 : newLeases) { ExtendedSequenceNumber expectedCheckpoint = expectedShardIdToCheckpointMap.get(lease1.getLeaseKey()); @@ -835,193 +853,333 @@ public class ShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5- shards till epoch 102 - * \ / \ / | | - * 6 7 4 5- shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * Current leases: (3, 4, 5) + * Initial position: LATEST + * Leases to create: (2, 6) */ @Test - public final void testDetermineNewLeasesToCreateSplitMergeLatest1() { - List shards = constructShardListForGraphA(); - List currentLeases = new ArrayList(); + public final void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange1() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); - currentLeases.add(newLease("shardId-3")); - currentLeases.add(newLease("shardId-4")); - currentLeases.add(newLease("shardId-5")); - - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); - Map expectedShardIdCheckpointMap = + final Map expectedShardIdCheckpointMap = new HashMap(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.LATEST); - expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.TRIM_HORIZON); - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); } /** * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5- shards till epoch 102 - * \ / \ / | | - * 6 7 4 5- shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * Current leases: (4, 5, 7) + * Initial position: LATEST + * Leases to create: (6) */ @Test - public final void testDetermineNewLeasesToCreateSplitMergeLatest2() { - List shards = constructShardListForGraphA(); - List currentLeases = new ArrayList(); + public final void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange2() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); - currentLeases.add(newLease("shardId-4")); - currentLeases.add(newLease("shardId-5")); - currentLeases.add(newLease("shardId-7")); - - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); - Map expectedShardIdCheckpointMap = + final Map expectedShardIdCheckpointMap = new HashMap(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); } /** * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position TrimHorizon) * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5- shards till epoch 102 - * \ / \ / | | - * 6 7 4 5- shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (3, 4, 5) + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (2, 6) + * Initial position: LATEST + * Leases to create: (3, 4, 9, 10) */ @Test - public final void testDetermineNewLeasesToCreateSplitMergeHorizon1() { + public final void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange3() { List shards = constructShardListForGraphA(); - List currentLeases = new ArrayList(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); - currentLeases.add(newLease("shardId-3")); - currentLeases.add(newLease("shardId-4")); - currentLeases.add(newLease("shardId-5")); - - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); Map expectedShardIdCheckpointMap = new HashMap(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); } /** * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position TrimHorizon) * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5- shards till epoch 102 - * \ / \ / | | - * 6 7 4 5- shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (4, 5, 7) + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 9, 10) + * Initial position: LATEST + * Leases to create: (8) */ @Test - public final void testDetermineNewLeasesToCreateSplitMergeHorizon2() { - List shards = constructShardListForGraphA(); - List currentLeases = new ArrayList(); + public final void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange4() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); - currentLeases.add(newLease("shardId-4")); - currentLeases.add(newLease("shardId-5")); - currentLeases.add(newLease("shardId-7")); - - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); - Map expectedShardIdCheckpointMap = + final Map expectedShardIdCheckpointMap = new HashMap(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); - expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); } /** - * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position TrimHorizon) - * For shard graph B (see the construct method doc for structure). + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is + * epochs): 0 1 2 3 - shards till + * / \ | \ / + * 4 5 1 6 - shards from epoch 103 - 205 + * / \ / \ | | + * 7 8 9 10 1 6 + * shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (9, 10) + * Initial position: LATEST + * Expected leases: (1, 6, 7, 8) + */ + @Test + public final void testDetermineNewLeasesToCreateSplitMergeLatestC_PartialHashRange5() { + final List shards = constructShardListForGraphC(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-9", "shardId-10"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 6, 7) + * Initial position: LATEST + * Leases to create: empty set + */ + @Test + public final void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRange() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedShardIdCheckpointMap = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 2, 3, 4, 5, 6, 7) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public final void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", + "shardId-3", "shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedShardIdCheckpointMap = Collections.emptyMap(); + + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: empty set + * Initial position: LATEST + * Expected leases: (4, 8, 9, 10) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_EmptyLeaseTable() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 4, 7, 9, 10) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeAcrossDifferentEpochs() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", + "shardId-9", "shardId-10"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (6) + * Initial position: LATEST + * Expected leases: (7) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestB_PartialHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-6"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (5) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (0, 1, 2, 3, 4, 5) + * Initial position: LATEST + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) * * Current leases: empty set + * Initial position: LATEST + * Expected leases: (9, 10) */ @Test - public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() { - List shards = constructShardListForGraphB(); - List currentLeases = new ArrayList(); - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); - Map expectedShardIdCheckpointMap = - new HashMap(); - for (int i = 0; i < 11; i++) { - String expectedShardId = "shardId-" + i; - expectedShardIdCheckpointMap.put(expectedShardId, ExtendedSequenceNumber.TRIM_HORIZON); - } - - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + public void testDetermineNewLeasesToCreateSplitMergeLatestB_EmptyLeaseTable() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); } /** - * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) + * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): * 0 1 2 3 4 5- shards till epoch 102 * \ / \ / | | @@ -1029,40 +1187,22 @@ public class ShardSyncerTest { * \ / | /\ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * Current leases: (3, 4, 5) + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1, 2) */ @Test - public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp1() { - List shards = constructShardListForGraphA(); - List currentLeases = new ArrayList(); - - currentLeases.add(newLease("shardId-3")); - currentLeases.add(newLease("shardId-4")); - currentLeases.add(newLease("shardId-5")); - - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); - Map expectedShardIdCheckpointMap = new HashMap(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); - - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange1() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); } /** - * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) + * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): * 0 1 2 3 4 5- shards till epoch 102 * \ / \ / | | @@ -1070,72 +1210,576 @@ public class ShardSyncerTest { * \ / | /\ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * Current leases: (4, 5, 7) + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1) */ @Test - public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp2() { - List shards = constructShardListForGraphA(); - List currentLeases = new ArrayList(); - - currentLeases.add(newLease("shardId-4")); - currentLeases.add(newLease("shardId-5")); - currentLeases.add(newLease("shardId-7")); - - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); - Map expectedShardIdCheckpointMap = new HashMap(); - expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); - expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); - - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange2() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); } /** - * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) - * For shard graph B (see the construct method doc for structure). - * Current leases: empty set + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (2, 6) + * Initial position: TRIM_HORIZON + * Expected leases: (3, 4, 5) */ @Test - public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() { - List shards = constructShardListForGraphB(); - List currentLeases = new ArrayList(); - final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); - List newLeases = - shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); - Map expectedShardIdCheckpointMap = - new HashMap(); - for (int i = 0; i < shards.size(); i++) { - String expectedShardId = "shardId-" + i; - expectedShardIdCheckpointMap.put(expectedShardId, ExtendedSequenceNumber.AT_TIMESTAMP); - } - - Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); - for (KinesisClientLease lease : newLeases) { - Assert.assertTrue("Unexpected lease: " + lease, - expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); - Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); - } + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange3() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.TRIM_HORIZON); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); } + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 9, 10) + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1, 2, 3) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange4() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } - /* + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 6, 7) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRange() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 2, 3, 4, 5, 6, 7) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: empty set + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1, 2, 3, 4, 5) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_EmptyLeaseTable() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.TRIM_HORIZON); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 4, 7, 9, 10) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeAcrossDifferentEpochs() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", + "shardId-9", "shardId-10"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (6) + * Initial position: TRIM_HORIZON + * Expected leases: (7) + */ +// TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases +// @Test +// public void testDetermineNewLeasesToCreateSplitMergeHorizonB_PartialHashRange() { +// final List shards = constructShardListForGraphB(); +// final List shardIdsOfCurrentLeases = Arrays.asList("shardId-6"); +// final Map expectedShardIdCheckpointMap = new HashMap<>(); +// expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.TRIM_HORIZON); +// testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); +// } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (5) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (0, 1, 2, 3, 4, 5) + * Initial position: TRIM_HORIZON + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + } + + /** + * CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number)p + * + * Current leases: empty set + * Initial position: TRIM_HORIZON + * Expected leases: (0, 1) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeHorizonB_EmptyLeaseTable() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1, 2) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange1() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 7) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange2() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (2, 6) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (3, 4, 5) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange3() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.AT_TIMESTAMP); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 9, 10) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1, 2, 3) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange4() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 6, 7) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRange() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 2, 3, 4, 5, 6, 7) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5", "shardId-6", "shardId-7"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: empty set + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1, 2, 3, 4, 5) */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_EmptyLeaseTable() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.AT_TIMESTAMP); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (0, 1, 4, 7, 9, 10) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeAcrossDifferentEpochs() { + final List shards = constructShardListForGraphA(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", + "shardId-9", "shardId-10"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (6) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (7) + */ +// TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases +// @Test +// public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_PartialHashRange() { +// final List shards = constructShardListForGraphB(); +// final List shardIdsOfCurrentLeases = Arrays.asList("shardId-6"); +// final Map expectedShardIdCheckpointMap = new HashMap<>(); +// expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.AT_TIMESTAMP); +// testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); +// } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (5) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRange() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: (0, 1, 2, 3, 4, 5) + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: empty set + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRangeWithoutGC() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", + "shardId-4", "shardId-5"); + final Map expectedNoNewLeases = Collections.emptyMap(); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + } + + /** + * Shard structure (x-axis is epochs): + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) + * + * Current leases: empty set + * Initial position: AT_TIMESTAMP(1000) + * Expected leases: (0, 1) + */ + @Test + public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_EmptyLeaseTable() { + final List shards = constructShardListForGraphB(); + final List shardIdsOfCurrentLeases = Collections.emptyList(); + final Map expectedShardIdCheckpointMap = new HashMap<>(); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + testCheckIfDescendantAndAddNewLeasesForAncestors(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + } + + /** * Helper method to construct a shard list for graph A. Graph A is defined below. * Shard structure (y-axis is epochs): - * 0 1 2 3 4 5- shards till epoch 102 - * \ / \ / | | - * 6 7 4 5- shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 210 + * \ / | /\ + * 8 4 9 10 - shards from epoch 211 (open - no ending sequenceNumber) */ List constructShardListForGraphA() { List shards = new ArrayList(); @@ -1175,14 +1819,22 @@ public class ShardSyncerTest { return shards; } - /* + /** * Helper method to construct a shard list for graph B. Graph B is defined below. * Shard structure (x-axis is epochs): - * 0 3 6 9 - * \ / \ / \ / - * 2 5 8 - * / \ / \ / \ - * 1 4 7 10 + * 0 1 shards till epoch 1049 + * \ / + * 2 shards from epoch 1050 - 1099 + * / \ + * 3 4 shards from epoch 1100 - 1149 + * \ / + * 5 shards from epoch 1150 - 1199 + * / \ + * 6 7 shards from epoch 1200 - 1249 + * \ / + * 8 shards from epoch 1250 - 1299 + * / \ + * 9 10 shards from epoch 1300 (open - no ending sequence number) */ List constructShardListForGraphB() { List shards = new ArrayList(); @@ -1214,12 +1866,54 @@ public class ShardSyncerTest { return shards; } + /** + * Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is + * epochs): 0 1 2 3 - shards till + * / \ | \ / + * 4 5 1 6 - shards from epoch 103 - 205 + * / \ / \ | | + * 7 8 9 10 1 6 + * shards from epoch 206 (open - no ending sequenceNumber) + */ + private List constructShardListForGraphC() { + final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); + final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null); + final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("103", null); + final SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "205"); + final SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("206", null); + + return Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, range0, + ShardObjectHelper.newHashKeyRange("0", "399")), + ShardObjectHelper.newShard("shardId-1", null, null, range1, + ShardObjectHelper.newHashKeyRange("400", "499")), + ShardObjectHelper.newShard("shardId-2", null, null, range0, + ShardObjectHelper.newHashKeyRange("500", "599")), + ShardObjectHelper.newShard("shardId-3", null, null, range0, + ShardObjectHelper.newHashKeyRange("600", ShardObjectHelper.MAX_HASH_KEY)), + ShardObjectHelper.newShard("shardId-4", "shardId-0", null, range3, + ShardObjectHelper.newHashKeyRange("0", "199")), + ShardObjectHelper.newShard("shardId-5", "shardId-0", null, range3, + ShardObjectHelper.newHashKeyRange("200", "399")), + ShardObjectHelper.newShard("shardId-6", "shardId-2", "shardId-3", range2, + ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY)), + ShardObjectHelper.newShard("shardId-7", "shardId-4", null, range4, + ShardObjectHelper.newHashKeyRange("0", "99")), + ShardObjectHelper.newShard("shardId-8", "shardId-4", null, range4, + ShardObjectHelper.newHashKeyRange("100", "199")), + ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4, + ShardObjectHelper.newHashKeyRange("200", "299")), + ShardObjectHelper.newShard("shardId-10", "shardId-5", null, range4, + ShardObjectHelper.newHashKeyRange("300", "399"))); + } + + /** * Test CheckIfDescendantAndAddNewLeasesForAncestors when shardId is null */ @Test public final void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() { - Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); Assert.assertFalse(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null, @@ -1234,7 +1928,7 @@ public class ShardSyncerTest { public final void testCheckIfDescendantAndAddNewLeasesForAncestorsTrimmedShard() { String shardId = "shardId-trimmed"; Map kinesisShards = new HashMap(); - Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); Assert.assertFalse(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null, kinesisShards, @@ -1253,7 +1947,7 @@ public class ShardSyncerTest { Set shardIdsOfCurrentLeases = new HashSet(); shardIdsOfCurrentLeases.add(shardId); Map newLeaseMap = new HashMap(); - Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); Assert.assertTrue(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, @@ -1280,7 +1974,7 @@ public class ShardSyncerTest { String shardId = "shardId-9-1"; kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null)); - Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); Assert.assertFalse(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, @@ -1309,7 +2003,7 @@ public class ShardSyncerTest { Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null); kinesisShards.put(shardId, shard); - Map memoizationContext = new HashMap<>(); + final MemoizationContext memoizationContext = new MemoizationContext(); Assert.assertTrue(shardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, @@ -1857,6 +2551,244 @@ public class ShardSyncerTest { shardSyncer.assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, closedShardIds); } + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: TRIM_HORIZON + * Leases to create: (0, 1, 2, 3, 4, 5) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithEmptyLeaseTable() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0", "shardId-1", "shardId-2", + "shardId-3", "shardId-4", "shardId-5")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: AT_TIMESTAMP(1000) + * Leases to create: (8, 4, 9, 10) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable1() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-8", "shardId-4", "shardId-9", + "shardId-10")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: AT_TIMESTAMP(200) + * Leases to create: (6, 7, 4, 5) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable2() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-6", "shardId-7", "shardId-4", + "shardId-5")); + final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp(new Date(200L)); + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Initial position: LATEST + * Leases to create: (8, 4, 9, 10) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable2() throws Exception { + final List shards = constructShardListForGraphA(); + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-8", "shardId-4", "shardId-9", + "shardId-10")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate); + } + + private void testCheckAndCreateLeaseForShardsIfMissing(List shards,InitialPositionInStreamExtended initialPositionInStreamExtended, + Set expectedLeaseKeys) throws Exception { + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPositionInStreamExtended, expectedLeaseKeys, Collections.emptyList()); + } + + private void testCheckAndCreateLeaseForShardsIfMissing(List shards,InitialPositionInStreamExtended initialPositionInStreamExtended, + Set expectedLeaseKeys, List existingLeases) throws Exception { + final File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 0, "fileName"); + dataFile.deleteOnExit(); + final IKinesisProxy kinesisProxy = spy(new KinesisLocalFileProxy(dataFile.getAbsolutePath())); + when(kinesisProxy.getShardList()).thenReturn(shards); + when(kinesisProxy.getShardListWithFilter(any())).thenReturn(getFilteredShards(shards, initialPositionInStreamExtended)); + + // Populate existing leases + for (KinesisClientLease lease : existingLeases) { + leaseManager.createLeaseIfNotExists(lease); + } + + List oldLeases = leaseManager.listLeases(); + shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStreamExtended, + false, false); + List newLeases = leaseManager.listLeases(); + newLeases.removeAll(oldLeases); + + final Set newLeaseKeys = newLeases.stream().map(Lease::getLeaseKey).collect(Collectors.toSet()); + final Set newSequenceNumbers = newLeases.stream().map(KinesisClientLease::getCheckpoint).collect(Collectors.toSet()); + final Set expectedSequenceNumbers = new HashSet<>(Collections + .singletonList(new ExtendedSequenceNumber(initialPositionInStreamExtended.getInitialPositionInStream().name()))); + + assertThat(newLeases.size(), equalTo(expectedLeaseKeys.size())); + assertThat(newLeaseKeys, equalTo(expectedLeaseKeys)); + assertThat(newSequenceNumbers, equalTo(expectedSequenceNumbers)); + + dataFile.delete(); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: TRIM_HORIZON + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithPartialLeaseTable() throws Exception { + final List shards = constructShardListForGraphA(); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from TRIM_HORIZON. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.getShardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER); + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate, existingLeases); + } + + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: AT_TIMESTAMP(1000) + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable1() throws Exception { + final List shards = constructShardListForGraphA(); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from AT_TIMESTAMP. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.getShardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.AT_TIMESTAMP, LEASE_OWNER); + + final Set expectedLeaseKeys= new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeys, existingLeases); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: AT_TIMESTAMP(200) + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable2() throws Exception { + final List shards = constructShardListForGraphA(); + final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended + .newInitialPositionAtTimestamp(new Date(200L)); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from AT_TIMESTAMP. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.getShardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.AT_TIMESTAMP, LEASE_OWNER); + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate, existingLeases); + } + + /* + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Missing leases: (0, 6, 8) + * Initial position: LATEST + * Leases to create: (0) + */ + @Test + public void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable2() throws Exception { + final List shards = constructShardListForGraphA(); + // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the + // lease for shard-0 when reading from LATEST. + final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); + final List shardsWithLeases = shards.stream() + .filter(s -> !missingLeaseKeys.contains(s.getShardId())).collect(Collectors.toList()); + final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, LEASE_OWNER); + + final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate, existingLeases); + } + + + + private List createLeasesFromShards(final List shards, final ExtendedSequenceNumber checkpoint, + final String leaseOwner) { + return shards.stream().map(shard -> { + final Set parentShardIds = new HashSet<>(); + if (StringUtils.isNotEmpty(shard.getParentShardId())) { + parentShardIds.add(shard.getParentShardId()); + } + if (StringUtils.isNotEmpty(shard.getAdjacentParentShardId())) { + parentShardIds.add(shard.getAdjacentParentShardId()); + } + + final KinesisClientLease lease = new KinesisClientLease(); + lease.setLeaseKey(shard.getShardId()); + lease.setLeaseOwner(leaseOwner); + lease.setLeaseCounter(0L); + lease.setLastCounterIncrementNanos(0L); + lease.setCheckpoint(checkpoint); + lease.setOwnerSwitchesSinceCheckpoint(0L); + lease.setParentShardIds(parentShardIds); + + return lease; + }).collect(Collectors.toList()); + } /** * Helper method. * @@ -1870,6 +2802,33 @@ public class ShardSyncerTest { return lease; } + /** + * Helper method to test CheckIfDescendantAndAddNewLeasesForAncestors and verify new leases created with an expected result. + * @param shards + * @param shardIdsOfCurrentLeases + * @param checkpoint + * @param expectedShardIdCheckpointMap + */ + private void testCheckIfDescendantAndAddNewLeasesForAncestors(List shards, List shardIdsOfCurrentLeases, + InitialPositionInStreamExtended checkpoint, Map expectedShardIdCheckpointMap) { + final List currentLeases = shardIdsOfCurrentLeases.stream() + .map(shardId -> newLease(shardId)).collect(Collectors.toList()); + final Map shardIdToShardMap = KinesisShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = + KinesisShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer leaseSynchronizer = new NonEmptyLeaseTableSynchronizer(shardIdToShardMap, shardIdToChildShardIdsMap); + final List newLeases = + shardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, checkpoint); + + Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue("Unexpected lease: " + lease, + expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); + Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); + } + } + /** * Helper method to get appropriate LeaseSynchronizer based on available shards and current leases. If there are * no current leases (empty lease table case), return EmptyLeaseTableSynchronizer. Else, return @@ -1891,4 +2850,33 @@ public class ShardSyncerTest { return new NonEmptyLeaseTableSynchronizer(shardIdToShardMap, shardIdToChildShardIdsMap); } + + /** + * Helper method to mimic behavior of Kinesis ListShardsWithFilter calls. + */ + private static List getFilteredShards(List shards, InitialPositionInStreamExtended initialPosition) { + switch (initialPosition.getInitialPositionInStream()) { + case LATEST: + return shards.stream() + .filter(s -> s.getSequenceNumberRange().getEndingSequenceNumber() == null) + .collect(Collectors.toList()); + case TRIM_HORIZON: + String minSeqNum = shards.stream() + .min(Comparator.comparingLong(s -> Long.parseLong(s.getSequenceNumberRange().getStartingSequenceNumber()))) + .map(s -> s.getSequenceNumberRange().getStartingSequenceNumber()) + .orElseThrow(RuntimeException::new); + return shards.stream() + .filter(s -> s.getSequenceNumberRange().getStartingSequenceNumber().equals(minSeqNum)) + .collect(Collectors.toList()); + case AT_TIMESTAMP: + return shards.stream() + .filter(s -> new Date(Long.parseLong(s.getSequenceNumberRange().getStartingSequenceNumber())) + .compareTo(initialPosition.getTimestamp()) <= 0) + .filter(s -> s.getSequenceNumberRange().getEndingSequenceNumber() == null || + new Date(Long.parseLong(s.getSequenceNumberRange().getEndingSequenceNumber())) + .compareTo(initialPosition.getTimestamp()) > 0) + .collect(Collectors.toList()); + } + throw new RuntimeException("Unsupported initial position " + initialPosition); + } }