diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index f4143581..71567921 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -24,12 +24,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; @@ -38,6 +40,8 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; @@ -85,9 +89,10 @@ public class HierarchicalShardSyncer { * @param shardDetector * @param leaseRefresher * @param initialPosition + * @param scope * @param cleanupLeasesOfCompletedShards * @param ignoreUnexpectedChildShards - * @param scope + * @param garbageCollectLeases * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException @@ -96,20 +101,24 @@ public class HierarchicalShardSyncer { // CHECKSTYLE:OFF CyclomaticComplexity public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, - final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, - final MetricsScope scope) throws DependencyException, InvalidStateException, - ProvisionedThroughputException, KinesisClientLibIOException { - final List latestShards = getShardList(shardDetector); - checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, scope, latestShards); + final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, + final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + final List latestShards = isLeaseTableEmpty ? + getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); + checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, + isLeaseTableEmpty); } //Provide a pre-collcted list of shards to avoid calling ListShards API public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, - final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, - final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards) - throws DependencyException, InvalidStateException, - ProvisionedThroughputException, KinesisClientLibIOException { + final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, + List latestShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, + final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + + //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 + if (!CollectionUtils.isNullOrEmpty(latestShards)) { log.debug("Num shards: {}", latestShards.size()); } @@ -125,8 +134,10 @@ public class HierarchicalShardSyncer { getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) : leaseRefresher.listLeases(); final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier()); - final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, - inconsistentShardIds, multiStreamArgs); + final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() : + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + final List newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases, + initialPosition, inconsistentShardIds, multiStreamArgs); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -140,8 +151,10 @@ public class HierarchicalShardSyncer { } final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs); - if (cleanupLeasesOfCompletedShards) { + if (!isLeaseTableEmpty && garbageCollectLeases) { + cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs); + } + if (!isLeaseTableEmpty && cleanupLeasesOfCompletedShards) { cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher, multiStreamArgs); } @@ -299,55 +312,69 @@ public class HierarchicalShardSyncer { return shardIdToChildShardIdsMap; } - private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { - final List shards = shardDetector.listShards(); - if (shards == null) { - throw new KinesisClientLibIOException( - "Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list."); + /** + * Helper method to resolve the correct shard filter to use when listing shards from a position in a stream. + * @param initialPositionInStreamExtended + * @return ShardFilter shard filter for the corresponding position in the stream. + */ + private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) { + + ShardFilter.Builder builder = ShardFilter.builder(); + + switch (initialPositionInStreamExtended.getInitialPositionInStream()) { + case LATEST: + builder = builder.type(ShardFilterType.AT_LATEST); + break; + case TRIM_HORIZON: + builder = builder.type(ShardFilterType.AT_TRIM_HORIZON); + break; + case AT_TIMESTAMP: + builder = builder.type(ShardFilterType.AT_TIMESTAMP).timestamp(initialPositionInStreamExtended.getTimestamp().toInstant()); + break; } - return shards; + return builder.build(); + } + + private static List getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector, + InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { + final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); + final Optional> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter)); + + return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); + } + + private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + final Optional> shards = Optional.of(shardDetector.listShards()); + + return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); } /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. * - * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, - * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): - * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. - * If not, set checkpoint of the shard to the initial position specified by the client. - * To check if we need to create leases for ancestors, we use the following rules: - * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before - * we begin processing data from any of its descendants. - * * A shard does not start processing data until data from all its parents has been processed. - * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create - * leases corresponding to both the parents - the parent shard which is not a descendant will have - * its checkpoint set to Latest. - * - * We assume that if there is an existing lease for a shard, then either: - * * we have previously created a lease for its parent (if it was needed), or - * * the parent shard has expired. - * - * For example: - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | / \ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (3, 4, 5) - * New leases to create: (2, 6, 7, 8, 9, 10) - * - * The leases returned are sorted by the starting sequence number - following the same order - * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail - * before creating all the leases. + * @param leaseSynchronizer determines the strategy we'll be using to update any new leases. + * @param shards List of all shards in Kinesis (we'll create new leases based on this set) + * @param currentLeases List of current leases + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that + * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @param inconsistentShardIds Set of child shard ids having open parents. + * @param multiStreamArgs determines if we are using multistream mode. + * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard + */ + static List determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List shards, + final List currentLeases, final InitialPositionInStreamExtended initialPosition, + final Set inconsistentShardIds, final MultiStreamArgs multiStreamArgs) { + return leaseSynchronizer.determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds, multiStreamArgs); + } + + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. * - * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it - * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very - * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only - * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. - * - * + * @param leaseSynchronizer determines the strategy we'll be using to update any new leases. * @param shards List of all shards in Kinesis (we'll create new leases based on this set) * @param currentLeases List of current leases * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that @@ -355,92 +382,27 @@ public class HierarchicalShardSyncer { * @param inconsistentShardIds Set of child shard ids having open parents. * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ - static List determineNewLeasesToCreate(final List shards, final List currentLeases, - final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds, - final MultiStreamArgs multiStreamArgs) { - final Map shardIdToNewLeaseMap = new HashMap<>(); - final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); - - final Set shardIdsOfCurrentLeases = currentLeases.stream() - .peek(lease -> log.debug("Existing lease: {}", lease)) - .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) - .collect(Collectors.toSet()); - - final List openShards = getOpenShards(shards); - final Map memoizationContext = new HashMap<>(); - - // Iterate over the open shards and find those that don't have any lease entries. - for (Shard shard : openShards) { - final String shardId = shard.shardId(); - log.debug("Evaluating leases for open shard {} and its ancestors.", shardId); - if (shardIdsOfCurrentLeases.contains(shardId)) { - log.debug("Lease for shardId {} already exists. Not creating a lease", shardId); - } else if (inconsistentShardIds.contains(shardId)) { - log.info("shardId {} is an inconsistent child. Not creating a lease", shardId); - } else { - log.debug("Need to create a lease for shardId {}", shardId); - final Lease newLease = multiStreamArgs.isMultiStreamMode() ? - newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : - newKCLLease(shard); - final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, - shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, - memoizationContext, multiStreamArgs); - - /** - * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the - * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a - * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side - * timestamp at or after the specified initial position timestamp. - * - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * - * Current leases: empty set - * - * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with - * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to - * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin - * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases - * would then be deleted since they won't have records with server-side timestamp at/after 206. And - * after that we will begin processing the descendant shards with epoch at/after 206 and we will - * return the records that meet the timestamp requirement for these shards. - */ - if (isDescendant - && !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { - newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { - newLease.checkpoint(convertToCheckpoint(initialPosition)); - } - log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint()); - shardIdToNewLeaseMap.put(shardId, newLease); - } - } - - final List newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values()); - final Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMapOfAllKinesisShards, multiStreamArgs); - newLeasesToCreate.sort(startingSequenceNumberComparator); - return newLeasesToCreate; - } - - static List determineNewLeasesToCreate(final List shards, final List currentLeases, - final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds) { - return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds, + static List determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List shards, + final List currentLeases, final InitialPositionInStreamExtended initialPosition,final Set inconsistentShardIds) { + return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds, new MultiStreamArgs(false, null)); } /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. + * + * @param leaseSynchronizer determines the strategy we'll be using to update any new leases. + * @param shards List of all shards in Kinesis (we'll create new leases based on this set) + * @param currentLeases List of current leases + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that + * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ - static List determineNewLeasesToCreate(final List shards, final List currentLeases, - final InitialPositionInStreamExtended initialPosition) { + static List determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List shards, + final List currentLeases, final InitialPositionInStreamExtended initialPosition) { final Set inconsistentShardIds = new HashSet<>(); - return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); + return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds); } /** @@ -682,6 +644,7 @@ public class HierarchicalShardSyncer { if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) { assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); + //TODO: Verify before LTR launch that ending sequence number is still returned from the service. Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( shardIdToShardMap, multiStreamArgs); leasesOfClosedShards.sort(startingSequenceNumberComparator); @@ -864,4 +827,210 @@ public class HierarchicalShardSyncer { private final StreamIdentifier streamIdentifier; } + /** + * Interface to determine how to create new leases. + */ + @VisibleForTesting + interface LeaseSynchronizer { + /** + * Determines how to create leases. + * @param shards + * @param currentLeases + * @param initialPosition + * @param inconsistentShardIds + * @param multiStreamArgs + * @return + */ + List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, + MultiStreamArgs multiStreamArgs); + } + + /** + * Class to help create leases when the table is initially empty. + */ + @Slf4j + @AllArgsConstructor + static class EmptyLeaseTableSynchronizer implements LeaseSynchronizer { + + /** + * Determines how to create leases when the lease table is initially empty. For this, we read all shards where + * the KCL is reading from. For any shards which are closed, we will discover their child shards through GetRecords + * child shard information. + * + * @param shards + * @param currentLeases + * @param initialPosition + * @param inconsistentShardIds + * @param multiStreamArgs + * @return + */ + @Override + public List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, MultiStreamArgs multiStreamArgs) { + final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); + + currentLeases.stream().peek(lease -> log.debug("Existing lease: {}", lease)) + .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) + .collect(Collectors.toSet()); + + final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs); + + //TODO: Verify before LTR launch that ending sequence number is still returned from the service. + final Comparator startingSequenceNumberComparator = + new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards, multiStreamArgs); + newLeasesToCreate.sort(startingSequenceNumberComparator); + return newLeasesToCreate; + } + + /** + * Helper method to create leases. For an empty lease table, we will be creating leases for all shards + * regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon + * reaching SHARD_END. + */ + private List getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, + List shards, MultiStreamArgs multiStreamArgs) { + final Map shardIdToNewLeaseMap = new HashMap<>(); + + for (Shard shard : shards) { + final String shardId = shard.shardId(); + final Lease lease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier) : newKCLLease(shard); + lease.checkpoint(convertToCheckpoint(initialPosition)); + + log.debug("Need to create a lease for shard with shardId {}", shardId); + + shardIdToNewLeaseMap.put(shardId, lease); + } + + return new ArrayList(shardIdToNewLeaseMap.values()); + } + } + + + /** + * Class to help create leases when the lease table is not initially empty. + */ + @Slf4j + @AllArgsConstructor + static class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { + + private final ShardDetector shardDetector; + private final Map shardIdToShardMap; + private final Map> shardIdToChildShardIdsMap; + + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. + *

+ * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, + * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): + * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. + * If not, set checkpoint of the shard to the initial position specified by the client. + * To check if we need to create leases for ancestors, we use the following rules: + * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before + * we begin processing data from any of its descendants. + * * A shard does not start processing data until data from all its parents has been processed. + * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create + * leases corresponding to both the parents - the parent shard which is not a descendant will have + * its checkpoint set to Latest. + *

+ * We assume that if there is an existing lease for a shard, then either: + * * we have previously created a lease for its parent (if it was needed), or + * * the parent shard has expired. + *

+ * For example: + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * New leases to create: (2, 6, 7, 8, 9, 10) + *

+ * The leases returned are sorted by the starting sequence number - following the same order + * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail + * before creating all the leases. + *

+ * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it + * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very + * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only + * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. + * + * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard + */ + @Override + public synchronized List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, MultiStreamArgs multiStreamArgs) { + final Map shardIdToNewLeaseMap = new HashMap<>(); + final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); + + final Set shardIdsOfCurrentLeases = currentLeases.stream() + .peek(lease -> log.debug("Existing lease: {}", lease)) + .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) + .collect(Collectors.toSet()); + + final List openShards = getOpenShards(shards); + final Map memoizationContext = new HashMap<>(); + + // Iterate over the open shards and find those that don't have any lease entries. + for (Shard shard : openShards) { + final String shardId = shard.shardId(); + log.debug("Evaluating leases for open shard {} and its ancestors.", shardId); + if (shardIdsOfCurrentLeases.contains(shardId)) { + log.debug("Lease for shardId {} already exists. Not creating a lease", shardId); + } else if (inconsistentShardIds.contains(shardId)) { + log.info("shardId {} is an inconsistent child. Not creating a lease", shardId); + } else { + log.debug("Need to create a lease for shardId {}", shardId); + final Lease newLease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : + newKCLLease(shard); + final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, + shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, + memoizationContext, multiStreamArgs); + + /** + * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the + * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a + * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side + * timestamp at or after the specified initial position timestamp. + * + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: empty set + * + * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with + * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to + * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin + * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases + * would then be deleted since they won't have records with server-side timestamp at/after 206. And + * after that we will begin processing the descendant shards with epoch at/after 206 and we will + * return the records that meet the timestamp requirement for these shards. + */ + if (isDescendant + && !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { + newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } else { + newLease.checkpoint(convertToCheckpoint(initialPosition)); + } + log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint()); + shardIdToNewLeaseMap.put(shardId, newLease); + } + } + + final List newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values()); + //TODO: Verify before LTR launch that ending sequence number is still returned from the service. + final Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( + shardIdToShardMapOfAllKinesisShards, multiStreamArgs); + newLeasesToCreate.sort(startingSequenceNumberComparator); + return newLeasesToCreate; + } + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 0c495558..c0c3bdee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; @@ -149,12 +150,18 @@ public class KinesisShardDetector implements ShardDetector { @Override @Synchronized public List listShards() { + return listShardsWithFilter(null); + } + + @Override + @Synchronized + public List listShardsWithFilter(ShardFilter shardFilter) { final List shards = new ArrayList<>(); ListShardsResponse result; String nextToken = null; do { - result = listShards(nextToken); + result = listShards(shardFilter, nextToken); if (result == null) { /* @@ -172,13 +179,13 @@ public class KinesisShardDetector implements ShardDetector { return shards; } - private ListShardsResponse listShards(final String nextToken) { + private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(LimitExceededException.class, t -> t); exceptionManager.add(ResourceInUseException.class, t -> t); exceptionManager.add(KinesisException.class, t -> t); - ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder(); + ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder().shardFilter(shardFilter); if (StringUtils.isEmpty(nextToken)) { request = request.streamName(streamIdentifier.streamName()); } else { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 6ae012e6..1b2822ee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.common.StreamIdentifier; import java.util.List; @@ -28,6 +29,8 @@ public interface ShardDetector { List listShards(); + List listShardsWithFilter(ShardFilter shardFilter); + default StreamIdentifier streamIdentifier() { throw new UnsupportedOperationException("StreamName not available"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index d7548a55..f5c7ab8a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -46,6 +46,7 @@ public class ShardSyncTask implements ConsumerTask { @NonNull private final InitialPositionInStreamExtended initialPosition; private final boolean cleanupLeasesUponShardCompletion; + private final boolean garbageCollectLeases; private final boolean ignoreUnexpectedChildShards; private final long shardSyncTaskIdleTimeMillis; @NonNull @@ -66,8 +67,10 @@ public class ShardSyncTask implements ConsumerTask { boolean shardSyncSuccess = true; try { - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope); + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, + initialPosition, scope, garbageCollectLeases, ignoreUnexpectedChildShards, cleanupLeasesUponShardCompletion, + leaseRefresher.isLeaseTableEmpty()); + if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index 8c8e0464..a52ac650 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -48,6 +48,7 @@ public class ShardSyncTaskManager { @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesUponShardCompletion; + private final boolean garbageCollectLeases; private final boolean ignoreUnexpectedChildShards; private final long shardSyncIdleTimeMillis; @NonNull @@ -84,6 +85,7 @@ public class ShardSyncTaskManager { this.leaseRefresher = leaseRefresher; this.initialPositionInStream = initialPositionInStream; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.garbageCollectLeases = true; this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.executorService = executorService; @@ -114,6 +116,7 @@ public class ShardSyncTaskManager { this.leaseRefresher = leaseRefresher; this.initialPositionInStream = initialPositionInStream; this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.garbageCollectLeases = true; this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; this.executorService = executorService; @@ -128,6 +131,7 @@ public class ShardSyncTaskManager { leaseRefresher, initialPositionInStream, cleanupLeasesUponShardCompletion, + garbageCollectLeases, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, hierarchicalShardSyncer, @@ -166,6 +170,7 @@ public class ShardSyncTaskManager { leaseRefresher, initialPositionInStream, cleanupLeasesUponShardCompletion, + garbageCollectLeases, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, hierarchicalShardSyncer, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index 4e9245f6..c0ed2d2a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -19,6 +19,7 @@ import software.amazon.kinesis.metrics.MetricsScope; @Deprecated public class ShardSyncer { private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer(); + private static final boolean garbageCollectLeases = true; /** *

NOTE: This method is deprecated and will be removed in a future release.

@@ -41,6 +42,6 @@ public class ShardSyncer { final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 2bfcd358..f4538be6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; -import com.sun.org.apache.bcel.internal.generic.LUSHR; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -28,7 +27,6 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; -import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.HierarchicalShardSyncer; @@ -42,8 +40,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -69,6 +65,8 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; + private final boolean garbageCollectLeases = false; + private final boolean isLeaseTableEmpty= false; private final boolean ignoreUnexpectedChildShards; @NonNull private final LeaseCoordinator leaseCoordinator; @@ -155,7 +153,8 @@ public class ShutdownTask implements ConsumerTask { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), - initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); + initialPositionInStream, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, + isLeaseTableEmpty); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 0cc50c2a..d881b776 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -21,6 +21,7 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -41,6 +42,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.junit.Before; @@ -53,6 +55,8 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -80,6 +84,7 @@ public class HierarchicalShardSyncerTest { private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs( MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); + private final boolean garbageCollectLeases = true; private final boolean cleanupLeasesOfCompletedShards = true; private final boolean ignoreUnexpectedChildShards = false; @@ -112,9 +117,10 @@ public class HierarchicalShardSyncerTest { public void testDetermineNewLeasesToCreateNoShards() { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); + final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(), - equalTo(true)); + assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, + INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); } /** @@ -123,10 +129,11 @@ public class HierarchicalShardSyncerTest { @Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); + final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); assertThat(HierarchicalShardSyncer - .determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS) - .isEmpty(), equalTo(true)); + .determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST, + new HashSet<>(), MULTI_STREAM_ARGS).isEmpty(), equalTo(true)); } /** @@ -141,9 +148,10 @@ public class HierarchicalShardSyncerTest { final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); final List currentLeases = Collections.emptyList(); + final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -163,9 +171,10 @@ public class HierarchicalShardSyncerTest { final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); final List currentLeases = Collections.emptyList(); + final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); @@ -175,7 +184,7 @@ public class HierarchicalShardSyncerTest { } /** - * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but + * Test determineNewLeasesToCreate() where there is one lease and no resharding operations have been performed, but * one of the shards was marked as inconsistent. */ @Test @@ -183,16 +192,24 @@ public class HierarchicalShardSyncerTest { final String shardId0 = "shardId-0"; final String shardId1 = "shardId-1"; final String shardId2 = "shardId-2"; + final String shardId3 = "shardId-3"; final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); - final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), + final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId3, null, null, sequenceRange)); + final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), ShardObjectHelper.newShard(shardId1, null, null, sequenceRange), ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); - final List currentLeases = Collections.emptyList(); + final List shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList()); + final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap); + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -209,16 +226,24 @@ public class HierarchicalShardSyncerTest { final String shardId0 = "shardId-0"; final String shardId1 = "shardId-1"; final String shardId2 = "shardId-2"; + final String shardId3 = "shardId-3"; final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); - final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), + final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId3, null, null, sequenceRange)); + final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), ShardObjectHelper.newShard(shardId1, null, null, sequenceRange), ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); - final List currentLeases = Collections.emptyList(); + final List shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList()); + final List currentLeases = new ArrayList(createMultiStreamLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo")); final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap); + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>( @@ -258,7 +283,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -292,7 +317,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -334,7 +359,8 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); + latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -370,7 +396,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); + latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -406,7 +433,8 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList()); + new ArrayList(), cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>(); @@ -446,7 +474,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -466,7 +494,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -501,7 +530,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, true, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -546,7 +575,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, true, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -602,7 +631,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -617,7 +646,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) @@ -677,7 +706,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -694,7 +723,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { List deleteLeases = leaseDeleteCaptor.getAllValues(); Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -718,7 +747,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -779,7 +808,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. Fails on ListLeases hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -789,7 +818,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases not present, leases will be created. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -804,7 +833,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -870,7 +899,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. Create lease Fails hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -879,7 +908,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -894,7 +923,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -984,7 +1013,7 @@ public class HierarchicalShardSyncerTest { doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); @@ -1013,7 +1042,7 @@ public class HierarchicalShardSyncerTest { doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); setupMultiStream(); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); @@ -1045,7 +1074,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - cleanupLeasesOfCompletedShards, false, SCOPE); + SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -1069,6 +1098,7 @@ public class HierarchicalShardSyncerTest { final String shardId0 = "shardId-0"; final String shardId1 = "shardId-1"; final List currentLeases = new ArrayList<>(); + final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), @@ -1080,7 +1110,7 @@ public class HierarchicalShardSyncerTest { final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); for (InitialPositionInStreamExtended initialPosition : initialPositions) { - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, initialPosition); assertThat(newLeases.size(), equalTo(2)); @@ -1092,18 +1122,33 @@ public class HierarchicalShardSyncerTest { } } + /** + * Tests that leases are not created for closed shards. + */ @Test public void testDetermineNewLeasesToCreateIgnoreClosedShard() { final String lastShardId = "shardId-1"; - final List currentLeases = new ArrayList<>(); - final List shards = Arrays.asList( + final List shardsWithoutLeases = Arrays.asList( ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("303", "404")), ShardObjectHelper.newShard(lastShardId, null, null, ShardObjectHelper.newSequenceNumberRange("405", null))); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard("shardId-2", null, + null, ShardObjectHelper.newSequenceNumberRange("202", "302"))); + + final List shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList()); + final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); + final Set inconsistentShardIds = Collections.emptySet(); + + Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); assertThat(newLeases.size(), equalTo(1)); @@ -1125,9 +1170,15 @@ public class HierarchicalShardSyncerTest { final List shards = constructShardListForGraphA(); final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -1161,8 +1212,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -1194,8 +1252,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_TRIM_HORIZON); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -1229,8 +1294,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_TRIM_HORIZON); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -1259,8 +1331,15 @@ public class HierarchicalShardSyncerTest { final List shards = constructShardListForGraphB(); final List currentLeases = new ArrayList<>(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_TRIM_HORIZON); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -1294,8 +1373,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_AT_TIMESTAMP); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) .collect(Collectors.toList()); @@ -1328,8 +1414,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_AT_TIMESTAMP); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) .collect(Collectors.toList()); @@ -1355,8 +1448,15 @@ public class HierarchicalShardSyncerTest { final List shards = constructShardListForGraphB(); final List currentLeases = new ArrayList<>(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_AT_TIMESTAMP); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) .collect(Collectors.toList()); @@ -1515,7 +1615,74 @@ public class HierarchicalShardSyncerTest { assertThat(newLeaseMap.isEmpty(), equalTo(true)); } -// /** + /** + * Tests that when reading from TIP, we use the AT_LATEST shard filter. + * @throws Exception + */ + @Test + public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception { + ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); + testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter); + } + + /** + * Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter. + * @throws Exception + */ + @Test + public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception { + ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build(); + testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter); + } + + /** + * Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter. + * @throws Exception + */ + @Test + public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception { + ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(new Date(1000L).toInstant()).build(); + testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter); + } + + public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, ShardFilter shardFilter) throws Exception { + final String shardId0 = "shardId-0"; + final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, + ShardObjectHelper.newSequenceNumberRange("1", null), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY))); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShardsWithFilter(shardFilter)).thenReturn(shards); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter); + verify(shardDetector, never()).listShards(); + } + + @Test + public void testNonEmptyLeaseTableUsesListShards() throws Exception { + final String shardId0 = "shardId-0"; + final String shardId1 = "shardId-1"; + + final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"))); + final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, ShardObjectHelper.newSequenceNumberRange("3", "4"))); + + final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); + when(shardDetector.listShards()).thenReturn(shardsWithoutLeases); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + verify(shardDetector, atLeast(1)).listShards(); + } + +// /**getShardFilterFromInitialPosition // * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent. // */ // @Test diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index 42b826d1..ce6ce386 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -118,7 +118,8 @@ public class ShardSyncTaskIntegrationTest { leaseRefresher.deleteAll(); Set shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet()); ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher, - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), + false, true, false, 0L, hierarchicalShardSyncer, NULL_METRICS_FACTORY); syncTask.call(); List leases = leaseRefresher.listLeases(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 6af62edb..220fe4a5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; @@ -42,7 +43,6 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; -import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; @@ -127,6 +127,9 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenSyncingShardsThrows() throws Exception { + final boolean garbageCollectLeases = false; + final boolean isLeaseTableEmpty = false; + List latestShards = constructShardListGraphA(); when(shardDetector.listShards()).thenReturn(latestShards); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); @@ -136,8 +139,8 @@ public class ShutdownTaskTest { throw new KinesisClientLibIOException("KinesisClientLibIOException"); }).when(hierarchicalShardSyncer) .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - NULL_METRICS_FACTORY.createMetrics(), latestShards); + latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, + NULL_METRICS_FACTORY.createMetrics(), garbageCollectLeases, isLeaseTableEmpty); final TaskResult result = task.call(); assertNotNull(result.getException());