diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/EmptyLeaseTableSynchronizer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/EmptyLeaseTableSynchronizer.java new file mode 100644 index 00000000..b22ec8ac --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/EmptyLeaseTableSynchronizer.java @@ -0,0 +1,78 @@ +package software.amazon.kinesis.leases; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static software.amazon.kinesis.leases.HierarchicalShardSyncer.constructShardIdToShardMap; +import static software.amazon.kinesis.leases.HierarchicalShardSyncer.convertToCheckpoint; +import static software.amazon.kinesis.leases.HierarchicalShardSyncer.newKCLLease; + +@Slf4j +@AllArgsConstructor +public class EmptyLeaseTableSynchronizer implements LeaseSynchronizer { + + @Override + public List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition, + Set inconsistentShardIds, + HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) { + + final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); + + currentLeases.stream().peek(lease -> log.debug("Existing lease: {}", lease)) + .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) + .collect(Collectors.toSet()); + + final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards); + + final Comparator startingSequenceNumberComparator = + new HierarchicalShardSyncer.StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards, multiStreamArgs); + newLeasesToCreate.sort(startingSequenceNumberComparator); + return newLeasesToCreate; + } + + @Override + public void cleanupGarbageLeases(List shards, LeaseRefresher leaseRefresher, List trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + // Nothing to do here. + } + + @Override + public void cleanupLeasesOfFinishedShards(LeaseRefresher leaseRefresher, List currentLeases, List trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + // Nothing to do here. + } + + /** + * Helper method to create leases. For an empty lease table, we will be creating leases for all shards + * regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon + * reaching SHARD_END. + */ + private List getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, + List shards) { + final Map shardIdToNewLeaseMap = new HashMap<>(); + + for (Shard shard : shards) { + final String shardId = shard.shardId(); + final Lease lease = newKCLLease(shard); + lease.checkpoint(convertToCheckpoint(initialPosition)); + + log.debug("Need to create a lease for shard with shardId {}", shardId); + + shardIdToNewLeaseMap.put(shardId, lease); + } + + return new ArrayList(shardIdToNewLeaseMap.values()); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index f4143581..a474fc7a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; @@ -38,6 +39,8 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; @@ -99,7 +102,8 @@ public class HierarchicalShardSyncer { final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List latestShards = getShardList(shardDetector); + final List latestShards = leaseRefresher.isLeaseTableEmpty() ? + getShardListAtInitialPosition(shardDetector, initialPosition) : getFullShardList(shardDetector); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); } @@ -125,8 +129,11 @@ public class HierarchicalShardSyncer { getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) : leaseRefresher.listLeases(); final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier()); - final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, - inconsistentShardIds, multiStreamArgs); + final LeaseSynchronizer leaseSynchronizer = leaseRefresher.isLeaseTableEmpty() ? + new EmptyLeaseTableSynchronizer() : + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + final List newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases, + initialPosition, inconsistentShardIds, multiStreamArgs); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -140,13 +147,33 @@ public class HierarchicalShardSyncer { } final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs); + leaseSynchronizer.cleanupGarbageLeases(latestShards, leaseRefresher, trackedLeases, multiStreamArgs); if (cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, - leaseRefresher, multiStreamArgs); + leaseSynchronizer.cleanupLeasesOfFinishedShards(leaseRefresher, currentLeases, trackedLeases, multiStreamArgs); } } + static List determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, final List shards, + final List currentLeases, + final InitialPositionInStreamExtended initialPosition) { + final Set inconsistentShardIds = new HashSet<>(); + return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds); + } + + static List determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, final List shards, + final List currentLeases, final InitialPositionInStreamExtended initialPosition, + final Set inconsistentShardIds) { + return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds, + new MultiStreamArgs(false, null)); + } + + static List determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, final List shards, + final List currentLeases, final InitialPositionInStreamExtended initialPosition, + final Set inconsistentShardIds, final MultiStreamArgs multiStreamArgs) { + return leaseSynchronizer.determineNewLeasesToCreate(shards, currentLeases, initialPosition,inconsistentShardIds, + multiStreamArgs); + } + // CHECKSTYLE:ON CyclomaticComplexity /** Note: This method has package level access solely for testing purposes. @@ -200,76 +227,7 @@ public class HierarchicalShardSyncer { .flatMap(entry -> shardIdToChildShardIdsMap.get(entry.getKey()).stream()).collect(Collectors.toSet()); } - /** - * Note: this has package level access for testing purposes. - * Useful for asserting that we don't have an incomplete shard list following a reshard operation. - * We verify that if the shard is present in the shard list, it is closed and its hash key range - * is covered by its child shards. - * @param shardIdsOfClosedShards Id of the shard which is expected to be closed - * @return ShardIds of child shards (children of the expectedClosedShard) - * @throws KinesisClientLibIOException - */ - synchronized void assertClosedShardsAreCoveredOrAbsent(final Map shardIdToShardMap, - final Map> shardIdToChildShardIdsMap, final Set shardIdsOfClosedShards) - throws KinesisClientLibIOException { - final String exceptionMessageSuffix = "This can happen if we constructed the list of shards " - + " while a reshard operation was in progress."; - - for (String shardId : shardIdsOfClosedShards) { - final Shard shard = shardIdToShardMap.get(shardId); - if (shard == null) { - log.info("Shard {} is not present in Kinesis anymore.", shardId); - continue; - } - - final String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); - if (endingSequenceNumber == null) { - throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards - + " is not closed. " + exceptionMessageSuffix); - } - final Set childShardIds = shardIdToChildShardIdsMap.get(shardId); - if (childShardIds == null) { - throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId - + " has no children." + exceptionMessageSuffix); - } - - assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds); - } - } - - private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard, - final Map shardIdToShardMap, final Set childShardIds) - throws KinesisClientLibIOException { - BigInteger minStartingHashKeyOfChildren = null; - BigInteger maxEndingHashKeyOfChildren = null; - - final BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().startingHashKey()); - final BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().endingHashKey()); - - for (String childShardId : childShardIds) { - final Shard childShard = shardIdToShardMap.get(childShardId); - final BigInteger startingHashKey = new BigInteger(childShard.hashKeyRange().startingHashKey()); - if (minStartingHashKeyOfChildren == null || startingHashKey.compareTo(minStartingHashKeyOfChildren) < 0) { - minStartingHashKeyOfChildren = startingHashKey; - } - - final BigInteger endingHashKey = new BigInteger(childShard.hashKeyRange().endingHashKey()); - if (maxEndingHashKeyOfChildren == null || endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0) { - maxEndingHashKeyOfChildren = endingHashKey; - } - } - - if (minStartingHashKeyOfChildren == null || maxEndingHashKeyOfChildren == null - || minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0 - || maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0) { - throw new KinesisClientLibIOException(String.format( - "Incomplete shard list: hash key range of shard %s is not covered by its child shards.", - closedShard.shardId())); - } - - } - /** * Helper method to construct shardId->setOfChildShardIds map. * Note: This has package access for testing purposes only. @@ -299,149 +257,44 @@ public class HierarchicalShardSyncer { return shardIdToChildShardIdsMap; } - private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { - final List shards = shardDetector.listShards(); - if (shards == null) { - throw new KinesisClientLibIOException( - "Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list."); + private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) { + + ShardFilter.Builder builder = ShardFilter.builder(); + + switch (initialPositionInStreamExtended.getInitialPositionInStream()) { + case LATEST: + builder = builder.type(ShardFilterType.AT_LATEST); + break; + case TRIM_HORIZON: + builder = builder.type(ShardFilterType.AT_TRIM_HORIZON); + break; + case AT_TIMESTAMP: + builder = builder.type(ShardFilterType.AT_TIMESTAMP).timestamp(initialPositionInStreamExtended.getTimestamp().toInstant()); + break; } - return shards; + return builder.build(); } - /** - * Determine new leases to create and their initial checkpoint. - * Note: Package level access only for testing purposes. - * - * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, - * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): - * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. - * If not, set checkpoint of the shard to the initial position specified by the client. - * To check if we need to create leases for ancestors, we use the following rules: - * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before - * we begin processing data from any of its descendants. - * * A shard does not start processing data until data from all its parents has been processed. - * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create - * leases corresponding to both the parents - the parent shard which is not a descendant will have - * its checkpoint set to Latest. - * - * We assume that if there is an existing lease for a shard, then either: - * * we have previously created a lease for its parent (if it was needed), or - * * the parent shard has expired. - * - * For example: - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | / \ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (3, 4, 5) - * New leases to create: (2, 6, 7, 8, 9, 10) - * - * The leases returned are sorted by the starting sequence number - following the same order - * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail - * before creating all the leases. - * - * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it - * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very - * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only - * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. - * - * - * @param shards List of all shards in Kinesis (we'll create new leases based on this set) - * @param currentLeases List of current leases - * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that - * location in the shard (when an application starts up for the first time - and there are no checkpoints). - * @param inconsistentShardIds Set of child shard ids having open parents. - * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard - */ - static List determineNewLeasesToCreate(final List shards, final List currentLeases, - final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds, - final MultiStreamArgs multiStreamArgs) { - final Map shardIdToNewLeaseMap = new HashMap<>(); - final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); + static List getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector, + InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { + final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); + final Optional> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter)); - final Set shardIdsOfCurrentLeases = currentLeases.stream() - .peek(lease -> log.debug("Existing lease: {}", lease)) - .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) - .collect(Collectors.toSet()); + log.debug("Retrieved {} shards with ShardFilter - {}.", shards.map(s -> s.size()).orElse(0), shardFilter); - final List openShards = getOpenShards(shards); - final Map memoizationContext = new HashMap<>(); - - // Iterate over the open shards and find those that don't have any lease entries. - for (Shard shard : openShards) { - final String shardId = shard.shardId(); - log.debug("Evaluating leases for open shard {} and its ancestors.", shardId); - if (shardIdsOfCurrentLeases.contains(shardId)) { - log.debug("Lease for shardId {} already exists. Not creating a lease", shardId); - } else if (inconsistentShardIds.contains(shardId)) { - log.info("shardId {} is an inconsistent child. Not creating a lease", shardId); - } else { - log.debug("Need to create a lease for shardId {}", shardId); - final Lease newLease = multiStreamArgs.isMultiStreamMode() ? - newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : - newKCLLease(shard); - final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, - shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, - memoizationContext, multiStreamArgs); - - /** - * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the - * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a - * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side - * timestamp at or after the specified initial position timestamp. - * - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * - * Current leases: empty set - * - * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with - * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to - * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin - * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases - * would then be deleted since they won't have records with server-side timestamp at/after 206. And - * after that we will begin processing the descendant shards with epoch at/after 206 and we will - * return the records that meet the timestamp requirement for these shards. - */ - if (isDescendant - && !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { - newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { - newLease.checkpoint(convertToCheckpoint(initialPosition)); - } - log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint()); - shardIdToNewLeaseMap.put(shardId, newLease); - } - } - - final List newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values()); - final Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMapOfAllKinesisShards, multiStreamArgs); - newLeasesToCreate.sort(startingSequenceNumberComparator); - return newLeasesToCreate; + return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - " + + "will retry getting the shard list.")); } - static List determineNewLeasesToCreate(final List shards, final List currentLeases, - final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds) { - return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds, - new MultiStreamArgs(false, null)); + static List getFullShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + final Optional> shards = Optional.of(shardDetector.listShards()); + + log.debug("Retrieved {} shards.", shards.map(s -> s.size()).orElse(0)); + + return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - " + + "will retry getting the shard list.")); } - /** - * Determine new leases to create and their initial checkpoint. - * Note: Package level access only for testing purposes. - */ - static List determineNewLeasesToCreate(final List shards, final List currentLeases, - final InitialPositionInStreamExtended initialPosition) { - final Set inconsistentShardIds = new HashSet<>(); - return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); - } /** * Note: Package level access for testing purposes only. @@ -571,49 +424,6 @@ public class HierarchicalShardSyncer { return parentShardIds; } - /** - * Delete leases corresponding to shards that no longer exist in the stream. Current scheme: Delete a lease if: - *
    - *
  • The corresponding shard is not present in the list of Kinesis shards
  • - *
  • The parentShardIds listed in the lease are also not present in the list of Kinesis shards.
  • - *
- * - * @param shards - * List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state). - * @param trackedLeases - * List of - * @param leaseRefresher - * @throws KinesisClientLibIOException - * Thrown if we couldn't get a fresh shard list from Kinesis. - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - private static void cleanupGarbageLeases(@NonNull final ShardDetector shardDetector, final List shards, - final List trackedLeases, final LeaseRefresher leaseRefresher, - final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException, - DependencyException, InvalidStateException, ProvisionedThroughputException { - final Set kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); - - // Check if there are leases for non-existent shards - final List garbageLeases = trackedLeases.stream() - .filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList()); - - if (!CollectionUtils.isNullOrEmpty(garbageLeases)) { - log.info("Found {} candidate leases for cleanup. Refreshing list of" - + " Kinesis shards to pick up recent/latest shards", garbageLeases.size()); - final Set currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId) - .collect(Collectors.toSet()); - - for (Lease lease : garbageLeases) { - if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) { - log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey()); - leaseRefresher.deleteLease(lease); - } - } - } - } - /** * Note: This method has package level access, solely for testing purposes. * @@ -652,91 +462,6 @@ public class HierarchicalShardSyncer { return isCandidateForCleanup; } - /** - * Private helper method. - * Clean up leases for shards that meet the following criteria: - * a/ the shard has been fully processed (checkpoint is set to SHARD_END) - * b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not - * TRIM_HORIZON. - * - * @param currentLeases List of leases we evaluate for clean up - * @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards) - * @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards) - * @param trackedLeases List of all leases we are tracking. - * @param leaseRefresher Lease refresher (will be used to delete leases) - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - * @throws KinesisClientLibIOException - */ - private synchronized void cleanupLeasesOfFinishedShards(final Collection currentLeases, - final Map shardIdToShardMap, final Map> shardIdToChildShardIdsMap, - final List trackedLeases, final LeaseRefresher leaseRefresher, - final MultiStreamArgs multiStreamArgs) throws DependencyException, - InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List leasesOfClosedShards = currentLeases.stream() - .filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) - .collect(Collectors.toList()); - final Set shardIdsOfClosedShards = leasesOfClosedShards.stream() - .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet()); - - if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) { - assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); - Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMap, multiStreamArgs); - leasesOfClosedShards.sort(startingSequenceNumberComparator); - final Map trackedLeaseMap = trackedLeases.stream() - .collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity())); - - for (Lease leaseOfClosedShard : leasesOfClosedShards) { - final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs); - final Set childShardIds = shardIdToChildShardIdsMap.get(closedShardId); - if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) { - cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs); - } - } - } - } - - /** - * Delete lease for the closed shard. Rules for deletion are: - * a/ the checkpoint for the closed shard is SHARD_END, - * b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON - * Note: This method has package level access solely for testing purposes. - * - * @param closedShardId Identifies the closed shard - * @param childShardIds ShardIds of children of the closed shard - * @param trackedLeases shardId->Lease map with all leases we are tracking (should not be null) - * @param leaseRefresher - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set childShardIds, - final Map trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - final Lease leaseForClosedShard = trackedLeases.get(closedShardId); - final List childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull) - .collect(Collectors.toList()); - - if (leaseForClosedShard != null && leaseForClosedShard.checkpoint().equals(ExtendedSequenceNumber.SHARD_END) - && childShardLeases.size() == childShardIds.size()) { - boolean okayToDelete = true; - for (Lease lease : childShardLeases) { - if (lease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) { - okayToDelete = false; - break; - } - } - - if (okayToDelete) { - log.info("Deleting lease for shard {} as it has been completely processed and processing of child " - + "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); - leaseRefresher.deleteLease(leaseForClosedShard); - } - } - } - /** * Helper method to create a new Lease POJO for a shard. * Note: Package level access only for testing purposes @@ -744,7 +469,7 @@ public class HierarchicalShardSyncer { * @param shard * @return */ - private static Lease newKCLLease(final Shard shard) { + static Lease newKCLLease(final Shard shard) { Lease newLease = new Lease(); newLease.leaseKey(shard.shardId()); List parentShardIds = new ArrayList<>(2); @@ -760,7 +485,7 @@ public class HierarchicalShardSyncer { return newLease; } - private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) { + static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) { MultiStreamLease newLease = new MultiStreamLease(); newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), shard.shardId())); List parentShardIds = new ArrayList<>(2); @@ -799,7 +524,7 @@ public class HierarchicalShardSyncer { .peek(shard -> log.debug("Found open shard: {}", shard.shardId())).collect(Collectors.toList()); } - private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) { + static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) { ExtendedSequenceNumber checkpoint = null; if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) { @@ -817,7 +542,7 @@ public class HierarchicalShardSyncer { * */ @RequiredArgsConstructor - private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator, Serializable { + static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator, Serializable { private static final long serialVersionUID = 1L; private final Map shardIdToShardMap; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 0c495558..c0c3bdee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; @@ -149,12 +150,18 @@ public class KinesisShardDetector implements ShardDetector { @Override @Synchronized public List listShards() { + return listShardsWithFilter(null); + } + + @Override + @Synchronized + public List listShardsWithFilter(ShardFilter shardFilter) { final List shards = new ArrayList<>(); ListShardsResponse result; String nextToken = null; do { - result = listShards(nextToken); + result = listShards(shardFilter, nextToken); if (result == null) { /* @@ -172,13 +179,13 @@ public class KinesisShardDetector implements ShardDetector { return shards; } - private ListShardsResponse listShards(final String nextToken) { + private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(LimitExceededException.class, t -> t); exceptionManager.add(ResourceInUseException.class, t -> t); exceptionManager.add(KinesisException.class, t -> t); - ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder(); + ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder().shardFilter(shardFilter); if (StringUtils.isEmpty(nextToken)) { request = request.streamName(streamIdentifier.streamName()); } else { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSynchronizer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSynchronizer.java new file mode 100644 index 00000000..e062f259 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSynchronizer.java @@ -0,0 +1,34 @@ +package software.amazon.kinesis.leases; + +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + +import java.util.List; +import java.util.Set; +import java.util.function.BiFunction; + +public interface LeaseSynchronizer { + + BiFunction shardIdFromLeaseDeducer = + (lease, multiStreamArgs) -> + multiStreamArgs.isMultiStreamMode() ? + ((MultiStreamLease) lease).shardId() : + lease.leaseKey(); + + List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, + HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs); + + void cleanupGarbageLeases(List shards, LeaseRefresher leaseRefresher, List trackedLeases, + HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) + throws DependencyException, ProvisionedThroughputException, InvalidStateException; + + void cleanupLeasesOfFinishedShards(LeaseRefresher leaseRefresher, List currentLeases, List trackedLeases, + HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) + throws DependencyException, ProvisionedThroughputException, InvalidStateException; +} + + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/NonEmptyLeaseTableSynchronizer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/NonEmptyLeaseTableSynchronizer.java new file mode 100644 index 00000000..ba8818e5 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/NonEmptyLeaseTableSynchronizer.java @@ -0,0 +1,314 @@ +package software.amazon.kinesis.leases; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static software.amazon.kinesis.leases.HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors; +import static software.amazon.kinesis.leases.HierarchicalShardSyncer.constructShardIdToShardMap; + +@Slf4j +@AllArgsConstructor +public class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { + + private final ShardDetector shardDetector; + private final Map shardIdToShardMap; + private final Map> shardIdToChildShardIdsMap; + + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. + *

+ * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, + * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): + * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. + * If not, set checkpoint of the shard to the initial position specified by the client. + * To check if we need to create leases for ancestors, we use the following rules: + * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before + * we begin processing data from any of its descendants. + * * A shard does not start processing data until data from all its parents has been processed. + * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create + * leases corresponding to both the parents - the parent shard which is not a descendant will have + * its checkpoint set to Latest. + *

+ * We assume that if there is an existing lease for a shard, then either: + * * we have previously created a lease for its parent (if it was needed), or + * * the parent shard has expired. + *

+ * For example: + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * New leases to create: (2, 6, 7, 8, 9, 10) + *

+ * The leases returned are sorted by the starting sequence number - following the same order + * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail + * before creating all the leases. + *

+ * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it + * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very + * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only + * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. + * + * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard + */ + @Override + public synchronized List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, + HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) { + final Map shardIdToNewLeaseMap = new HashMap<>(); + final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); + + final Set shardIdsOfCurrentLeases = currentLeases.stream() + .peek(lease -> log.debug("Existing lease: {}", lease)) + .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) + .collect(Collectors.toSet()); + + final List openShards = HierarchicalShardSyncer.getOpenShards(shards); + final Map memoizationContext = new HashMap<>(); + + // Iterate over the open shards and find those that don't have any lease entries. + for (Shard shard : openShards) { + final String shardId = shard.shardId(); + log.debug("Evaluating leases for open shard {} and its ancestors.", shardId); + if (shardIdsOfCurrentLeases.contains(shardId)) { + log.debug("Lease for shardId {} already exists. Not creating a lease", shardId); + } else if (inconsistentShardIds.contains(shardId)) { + log.info("shardId {} is an inconsistent child. Not creating a lease", shardId); + } else { + log.debug("Need to create a lease for shardId {}", shardId); + final Lease newLease = multiStreamArgs.isMultiStreamMode() ? + HierarchicalShardSyncer.newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) : + HierarchicalShardSyncer.newKCLLease(shard); + final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, + shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, + memoizationContext, multiStreamArgs); + + /** + * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the + * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a + * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side + * timestamp at or after the specified initial position timestamp. + * + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: empty set + * + * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with + * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to + * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin + * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases + * would then be deleted since they won't have records with server-side timestamp at/after 206. And + * after that we will begin processing the descendant shards with epoch at/after 206 and we will + * return the records that meet the timestamp requirement for these shards. + */ + if (isDescendant + && !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { + newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } else { + newLease.checkpoint(HierarchicalShardSyncer.convertToCheckpoint(initialPosition)); + } + log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint()); + shardIdToNewLeaseMap.put(shardId, newLease); + } + } + + final List newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values()); + final Comparator startingSequenceNumberComparator = new HierarchicalShardSyncer.StartingSequenceNumberAndShardIdBasedComparator( + shardIdToShardMapOfAllKinesisShards, multiStreamArgs); + newLeasesToCreate.sort(startingSequenceNumberComparator); + return newLeasesToCreate; + } + + @Override + public synchronized void cleanupGarbageLeases(List shards, LeaseRefresher leaseRefresher, + List trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final Set kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); + + // Check if there are leases for non-existent shards + final List garbageLeases = trackedLeases.stream() + .filter(lease -> HierarchicalShardSyncer.isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList()); + + if (!CollectionUtils.isNullOrEmpty(garbageLeases)) { + log.info("Found {} candidate leases for cleanup. Refreshing list of" + + " Kinesis shards to pick up recent/latest shards", garbageLeases.size()); + final Set currentKinesisShardIds = HierarchicalShardSyncer.getFullShardList(shardDetector).stream().map(Shard::shardId) + .collect(Collectors.toSet()); + + for (Lease lease : garbageLeases) { + if (HierarchicalShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) { + log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey()); + leaseRefresher.deleteLease(lease); + } + } + } + } + + @Override + public synchronized void cleanupLeasesOfFinishedShards(LeaseRefresher leaseRefresher, List currentLeases, + List trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final List leasesOfClosedShards = currentLeases.stream() + .filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) + .collect(Collectors.toList()); + final Set shardIdsOfClosedShards = leasesOfClosedShards.stream() + .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet()); + + if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) { + assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); + Comparator startingSequenceNumberComparator = + new HierarchicalShardSyncer.StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMap, multiStreamArgs); + leasesOfClosedShards.sort(startingSequenceNumberComparator); + final Map trackedLeaseMap = trackedLeases.stream() + .collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity())); + + for (Lease leaseOfClosedShard : leasesOfClosedShards) { + final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs); + final Set childShardIds = shardIdToChildShardIdsMap.get(closedShardId); + if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) { + cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs); + } + } + } + } + + /** + * Note: this has package level access for testing purposes. + * Useful for asserting that we don't have an incomplete shard list following a reshard operation. + * We verify that if the shard is present in the shard list, it is closed and its hash key range + * is covered by its child shards. + * @param shardIdsOfClosedShards Id of the shard which is expected to be closed + * @return ShardIds of child shards (children of the expectedClosedShard) + * @throws KinesisClientLibIOException + */ + synchronized void assertClosedShardsAreCoveredOrAbsent(final Map shardIdToShardMap, + final Map> shardIdToChildShardIdsMap, + final Set shardIdsOfClosedShards) + throws KinesisClientLibIOException { + final String exceptionMessageSuffix = "This can happen if we constructed the list of shards " + + " while a reshard operation was in progress."; + + for (String shardId : shardIdsOfClosedShards) { + final Shard shard = shardIdToShardMap.get(shardId); + if (shard == null) { + log.info("Shard {} is not present in Kinesis anymore.", shardId); + continue; + } + + final String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); + if (endingSequenceNumber == null) { + throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards + + " is not closed. " + exceptionMessageSuffix); + } + + final Set childShardIds = shardIdToChildShardIdsMap.get(shardId); + if (childShardIds == null) { + throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId + + " has no children." + exceptionMessageSuffix); + } + + assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds); + } + } + + private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard, + final Map shardIdToShardMap, final Set childShardIds) + throws KinesisClientLibIOException { + BigInteger minStartingHashKeyOfChildren = null; + BigInteger maxEndingHashKeyOfChildren = null; + + final BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().startingHashKey()); + final BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().endingHashKey()); + + for (String childShardId : childShardIds) { + final Shard childShard = shardIdToShardMap.get(childShardId); + final BigInteger startingHashKey = new BigInteger(childShard.hashKeyRange().startingHashKey()); + if (minStartingHashKeyOfChildren == null || startingHashKey.compareTo(minStartingHashKeyOfChildren) < 0) { + minStartingHashKeyOfChildren = startingHashKey; + } + + final BigInteger endingHashKey = new BigInteger(childShard.hashKeyRange().endingHashKey()); + if (maxEndingHashKeyOfChildren == null || endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0) { + maxEndingHashKeyOfChildren = endingHashKey; + } + } + + if (minStartingHashKeyOfChildren == null || maxEndingHashKeyOfChildren == null + || minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0 + || maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0) { + throw new KinesisClientLibIOException(String.format( + "Incomplete shard list: hash key range of shard %s is not covered by its child shards.", + closedShard.shardId())); + } + + } + + /** + * Delete lease for the closed shard. Rules for deletion are: + * a/ the checkpoint for the closed shard is SHARD_END, + * b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON + * Note: This method has package level access solely for testing purposes. + * + * @param closedShardId Identifies the closed shard + * @param childShardIds ShardIds of children of the closed shard + * @param trackedLeases shardId->Lease map with all leases we are tracking (should not be null) + * @param leaseRefresher + * @throws ProvisionedThroughputException + * @throws InvalidStateException + * @throws DependencyException + */ + synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set childShardIds, + final Map trackedLeases, final LeaseRefresher leaseRefresher, final HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + final Lease leaseForClosedShard = trackedLeases.get(closedShardId); + final List childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull) + .collect(Collectors.toList()); + + if (leaseForClosedShard != null && leaseForClosedShard.checkpoint().equals(ExtendedSequenceNumber.SHARD_END) + && childShardLeases.size() == childShardIds.size()) { + boolean okayToDelete = true; + for (Lease lease : childShardLeases) { + if (lease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) { + okayToDelete = false; + break; + } + } + + if (okayToDelete) { + log.info("Deleting lease for shard {} as it has been completely processed and processing of child " + + "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); + leaseRefresher.deleteLease(leaseForClosedShard); + } + } + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 6ae012e6..1b2822ee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.common.StreamIdentifier; import java.util.List; @@ -28,6 +29,8 @@ public interface ShardDetector { List listShards(); + List listShardsWithFilter(ShardFilter shardFilter); + default StreamIdentifier streamIdentifier() { throw new UnsupportedOperationException("StreamName not available"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index c59608b2..9016c1aa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -64,8 +64,9 @@ public class ShardSyncTask implements ConsumerTask { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION); try { - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope); + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, + initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope); + if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 0cc50c2a..4733d8df 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -112,9 +112,10 @@ public class HierarchicalShardSyncerTest { public void testDetermineNewLeasesToCreateNoShards() { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); + final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer(); - assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(), - equalTo(true)); + assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, + INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); } /** @@ -123,10 +124,11 @@ public class HierarchicalShardSyncerTest { @Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); + final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer(); assertThat(HierarchicalShardSyncer - .determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS) - .isEmpty(), equalTo(true)); + .determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST, + new HashSet<>(), MULTI_STREAM_ARGS).isEmpty(), equalTo(true)); } /** @@ -141,9 +143,10 @@ public class HierarchicalShardSyncerTest { final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); final List currentLeases = Collections.emptyList(); + final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -163,9 +166,10 @@ public class HierarchicalShardSyncerTest { final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); final List currentLeases = Collections.emptyList(); + final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); @@ -174,59 +178,6 @@ public class HierarchicalShardSyncerTest { assertThat(newLeaseKeys, equalTo(expectedLeaseIds)); } - /** - * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but - * one of the shards was marked as inconsistent. - */ - @Test - public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() { - final String shardId0 = "shardId-0"; - final String shardId1 = "shardId-1"; - final String shardId2 = "shardId-2"; - final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); - - final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), - ShardObjectHelper.newShard(shardId1, null, null, sequenceRange), - ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); - final List currentLeases = Collections.emptyList(); - - final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST, inconsistentShardIds); - final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); - assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); - assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); - } - - /** - * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but - * one of the shards was marked as inconsistent. - */ - @Test - public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() { - final String shardId0 = "shardId-0"; - final String shardId1 = "shardId-1"; - final String shardId2 = "shardId-2"; - final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); - - final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), - ShardObjectHelper.newShard(shardId1, null, null, sequenceRange), - ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); - final List currentLeases = Collections.emptyList(); - - final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); - final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set expectedLeaseShardIds = new HashSet<>( - toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); - assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); - assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); - } - /** * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) */ @@ -1069,6 +1020,7 @@ public class HierarchicalShardSyncerTest { final String shardId0 = "shardId-0"; final String shardId1 = "shardId-1"; final List currentLeases = new ArrayList<>(); + final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer(); final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), @@ -1080,7 +1032,7 @@ public class HierarchicalShardSyncerTest { final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); for (InitialPositionInStreamExtended initialPosition : initialPositions) { - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, initialPosition); assertThat(newLeases.size(), equalTo(2)); @@ -1092,24 +1044,6 @@ public class HierarchicalShardSyncerTest { } } - @Test - public void testDetermineNewLeasesToCreateIgnoreClosedShard() { - final String lastShardId = "shardId-1"; - final List currentLeases = new ArrayList<>(); - - final List shards = Arrays.asList( - ShardObjectHelper.newShard("shardId-0", null, null, - ShardObjectHelper.newSequenceNumberRange("303", "404")), - ShardObjectHelper.newShard(lastShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("405", null))); - - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); - - assertThat(newLeases.size(), equalTo(1)); - assertThat(newLeases.get(0).leaseKey(), equalTo(lastShardId)); - } - // /** // * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) // * Shard structure (each level depicts a stream segment): @@ -1125,9 +1059,15 @@ public class HierarchicalShardSyncerTest { final List shards = constructShardListForGraphA(); final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -1161,8 +1101,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_LATEST); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_LATEST); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -1194,8 +1141,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_TRIM_HORIZON); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -1229,8 +1183,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_TRIM_HORIZON); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -1259,8 +1220,15 @@ public class HierarchicalShardSyncerTest { final List shards = constructShardListForGraphB(); final List currentLeases = new ArrayList<>(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_TRIM_HORIZON); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -1294,8 +1262,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_AT_TIMESTAMP); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) .collect(Collectors.toList()); @@ -1328,8 +1303,15 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_AT_TIMESTAMP); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) .collect(Collectors.toList()); @@ -1355,8 +1337,15 @@ public class HierarchicalShardSyncerTest { final List shards = constructShardListForGraphB(); final List currentLeases = new ArrayList<>(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, - INITIAL_POSITION_AT_TIMESTAMP); + final Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); + final Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer + .constructShardIdToChildShardIdsMap(shardIdToShardMap); + + final LeaseSynchronizer nonEmptyLeaseTableSynchronizer = + new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) .collect(Collectors.toList()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 6af62edb..d3504bb6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -45,6 +45,7 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.LeaseSynchronizer; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardObjectHelper; diff --git a/pom.xml b/pom.xml index bdd2d0c2..4e8da722 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ - 2.10.66 + 2.10.65-SNAPSHOT