diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 68a1701d..5c18ee85 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -17,15 +17,11 @@ package software.amazon.kinesis.leases; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; @@ -106,9 +102,7 @@ public class HierarchicalShardSyncer { * @param leaseRefresher * @param initialPosition * @param scope - * @param cleanupLeasesOfCompletedShards * @param ignoreUnexpectedChildShards - * @param garbageCollectLeases * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException @@ -117,20 +111,18 @@ public class HierarchicalShardSyncer { // CHECKSTYLE:OFF CyclomaticComplexity public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, - final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, - final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) + final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); - checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, + checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope, isLeaseTableEmpty); } //Provide a pre-collcted list of shards to avoid calling ListShards API public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, - List latestShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, - final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) + List latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 @@ -166,13 +158,6 @@ public class HierarchicalShardSyncer { } final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); - if (!isLeaseTableEmpty && garbageCollectLeases) { - cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs); - } - if (!isLeaseTableEmpty && cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, - leaseRefresher, multiStreamArgs); - } } /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls @@ -205,44 +190,6 @@ public class HierarchicalShardSyncer { .flatMap(entry -> shardIdToChildShardIdsMap.get(entry.getKey()).stream()).collect(Collectors.toSet()); } - /** - * Note: this has package level access for testing purposes. - * Useful for asserting that we don't have an incomplete shard list following a reshard operation. - * We verify that if the shard is present in the shard list, it is closed and its hash key range - * is covered by its child shards. - * @param shardIdsOfClosedShards Id of the shard which is expected to be closed - * @return ShardIds of child shards (children of the expectedClosedShard) - * @throws KinesisClientLibIOException - */ - synchronized void assertClosedShardsAreCoveredOrAbsent(final Map shardIdToShardMap, - final Map> shardIdToChildShardIdsMap, final Set shardIdsOfClosedShards) - throws KinesisClientLibIOException { - final String exceptionMessageSuffix = "This can happen if we constructed the list of shards " - + " while a reshard operation was in progress."; - - for (String shardId : shardIdsOfClosedShards) { - final Shard shard = shardIdToShardMap.get(shardId); - if (shard == null) { - log.info("{} : Shard {} is not present in Kinesis anymore.", streamIdentifier, shardId); - continue; - } - - final String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); - if (endingSequenceNumber == null) { - throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards - + " is not closed. " + exceptionMessageSuffix); - } - - final Set childShardIds = shardIdToChildShardIdsMap.get(shardId); - if (childShardIds == null) { - throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId - + " has no children." + exceptionMessageSuffix); - } - - assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds); - } - } - private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard, final Map shardIdToShardMap, final Set childShardIds) throws KinesisClientLibIOException { @@ -617,177 +564,6 @@ public class HierarchicalShardSyncer { return parentShardIds; } - /** - * Delete leases corresponding to shards that no longer exist in the stream. Current scheme: Delete a lease if: - *
    - *
  • The corresponding shard is not present in the list of Kinesis shards
  • - *
  • The parentShardIds listed in the lease are also not present in the list of Kinesis shards.
  • - *
- * - * @param shards - * List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state). - * @param trackedLeases - * List of - * @param leaseRefresher - * @throws KinesisClientLibIOException - * Thrown if we couldn't get a fresh shard list from Kinesis. - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - private static void cleanupGarbageLeases(@NonNull final ShardDetector shardDetector, final List shards, - final List trackedLeases, final LeaseRefresher leaseRefresher, - final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException, - DependencyException, InvalidStateException, ProvisionedThroughputException { - final String streamIdentifier = getStreamIdentifier(multiStreamArgs); - final Set kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); - - // Check if there are leases for non-existent shards - final List garbageLeases = trackedLeases.stream() - .filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList()); - - if (!CollectionUtils.isNullOrEmpty(garbageLeases)) { - log.info("{} : Found {} candidate leases for cleanup. Refreshing list of" - + " Kinesis shards to pick up recent/latest shards", streamIdentifier, garbageLeases.size()); - final Set currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId) - .collect(Collectors.toSet()); - - for (Lease lease : garbageLeases) { - if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) { - log.info("{} : Deleting lease for shard {} as it is not present in Kinesis stream.", - streamIdentifier, lease.leaseKey()); - leaseRefresher.deleteLease(lease); - } - } - } - } - - /** - * Note: This method has package level access, solely for testing purposes. - * - * @param lease Candidate shard we are considering for deletion. - * @param currentKinesisShardIds - * @return true if neither the shard (corresponding to the lease), nor its parents are present in - * currentKinesisShardIds - * @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child - * shard (we are evaluating for deletion). - */ - static boolean isCandidateForCleanup(final Lease lease, final Set currentKinesisShardIds, - final MultiStreamArgs multiStreamArgs) - throws KinesisClientLibIOException { - - final String streamIdentifier = getStreamIdentifier(multiStreamArgs); - - boolean isCandidateForCleanup = true; - final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs); - - if (currentKinesisShardIds.contains(shardId)) { - isCandidateForCleanup = false; - } else { - log.info("{} : Found lease for non-existent shard: {}. Checking its parent shards", streamIdentifier, shardId); - final Set parentShardIds = lease.parentShardIds(); - for (String parentShardId : parentShardIds) { - - // Throw an exception if the parent shard exists (but the child does not). - // This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards. - if (currentKinesisShardIds.contains(parentShardId)) { - final String message = String.format("Parent shard %s exists but not the child shard %s", - parentShardId, shardId); - log.info("{} : {}", streamIdentifier, message); - throw new KinesisClientLibIOException(message); - } - } - } - - return isCandidateForCleanup; - } - - /** - * Private helper method. - * Clean up leases for shards that meet the following criteria: - * a/ the shard has been fully processed (checkpoint is set to SHARD_END) - * b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not - * TRIM_HORIZON. - * - * @param currentLeases List of leases we evaluate for clean up - * @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards) - * @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards) - * @param trackedLeases List of all leases we are tracking. - * @param leaseRefresher Lease refresher (will be used to delete leases) - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - * @throws KinesisClientLibIOException - */ - private synchronized void cleanupLeasesOfFinishedShards(final Collection currentLeases, - final Map shardIdToShardMap, final Map> shardIdToChildShardIdsMap, - final List trackedLeases, final LeaseRefresher leaseRefresher, - final MultiStreamArgs multiStreamArgs) throws DependencyException, - InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List leasesOfClosedShards = currentLeases.stream() - .filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) - .collect(Collectors.toList()); - final Set shardIdsOfClosedShards = leasesOfClosedShards.stream() - .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet()); - - if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) { - assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); - //TODO: Verify before LTR launch that ending sequence number is still returned from the service. - Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMap, multiStreamArgs); - leasesOfClosedShards.sort(startingSequenceNumberComparator); - final Map trackedLeaseMap = trackedLeases.stream() - .collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity())); - - for (Lease leaseOfClosedShard : leasesOfClosedShards) { - final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs); - final Set childShardIds = shardIdToChildShardIdsMap.get(closedShardId); - if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) { - cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs); - } - } - } - } - - /** - * Delete lease for the closed shard. Rules for deletion are: - * a/ the checkpoint for the closed shard is SHARD_END, - * b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON - * Note: This method has package level access solely for testing purposes. - * - * @param closedShardId Identifies the closed shard - * @param childShardIds ShardIds of children of the closed shard - * @param trackedLeases shardId->Lease map with all leases we are tracking (should not be null) - * @param leaseRefresher - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set childShardIds, - final Map trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - final Lease leaseForClosedShard = trackedLeases.get(closedShardId); - final List childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull) - .collect(Collectors.toList()); - - if (leaseForClosedShard != null && leaseForClosedShard.checkpoint().equals(ExtendedSequenceNumber.SHARD_END) - && childShardLeases.size() == childShardIds.size()) { - boolean okayToDelete = true; - for (Lease lease : childShardLeases) { - if (lease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) { - okayToDelete = false; - break; - } - } - - if (okayToDelete) { - log.info("{} : Deleting lease for shard {} as it has been completely processed and processing of child " - + "shards has begun.", streamIdentifier, shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs)); - leaseRefresher.deleteLease(leaseForClosedShard); - } - } - } - public synchronized Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException { final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index f5c7ab8a..820d4528 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -68,7 +68,7 @@ public class ShardSyncTask implements ConsumerTask { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, - initialPosition, scope, garbageCollectLeases, ignoreUnexpectedChildShards, cleanupLeasesUponShardCompletion, + initialPosition, scope, ignoreUnexpectedChildShards, leaseRefresher.isLeaseTableEmpty()); if (shardSyncTaskIdleTimeMillis > 0) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index 182854ff..f7ec12c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -27,7 +27,6 @@ public class ShardSyncer { * @param shardDetector * @param leaseRefresher * @param initialPosition - * @param cleanupLeasesOfCompletedShards * @param ignoreUnexpectedChildShards * @param scope * @throws DependencyException @@ -38,10 +37,9 @@ public class ShardSyncer { @Deprecated public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, - final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, - final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, - KinesisClientLibIOException, InterruptedException { + final boolean ignoreUnexpectedChildShards, final MetricsScope scope) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty()); + scope, ignoreUnexpectedChildShards, leaseRefresher.isLeaseTableEmpty()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 1be28b1d..c390987c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -89,8 +89,6 @@ public class HierarchicalShardSyncerTest { private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs( MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); - private final boolean garbageCollectLeases = true; - private final boolean cleanupLeasesOfCompletedShards = true; private final boolean ignoreUnexpectedChildShards = false; private HierarchicalShardSyncer hierarchicalShardSyncer; @@ -302,7 +300,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -337,7 +335,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -380,7 +378,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + latestShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( @@ -419,7 +417,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + latestShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( @@ -456,7 +454,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - new ArrayList(), cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases, + new ArrayList(), false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>(); @@ -663,7 +661,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -683,7 +681,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, + INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); @@ -719,7 +717,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -764,7 +762,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -820,7 +818,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -833,126 +831,19 @@ public class HierarchicalShardSyncerTest { verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); - // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. + // Second call: Leases present, no leases should be deleted. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); - final Set expectedShardIds = new HashSet<>(Collections.singletonList(String.format(shardIdPrefix, 0))); - final Set expectedSequenceNumbers = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.SHARD_END)); - - assertThat(deleteLeases.size(), equalTo(1)); - assertThat(shardIds, equalTo(expectedShardIds)); - assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers)); + assertThat(deleteLeases.size(), equalTo(0)); verify(shardDetector, times(2)).listShards(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(2)).listLeases(); - verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class)); - } - - @Test(expected = DependencyException.class) - public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithDeleteLeaseExceptions() - throws Exception { - testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseExceptions(ExtendedSequenceNumber.TRIM_HORIZON, - INITIAL_POSITION_TRIM_HORIZON); - } - - @Test(expected = DependencyException.class) - public void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithDeleteLeaseExceptions() - throws Exception { - testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseExceptions(ExtendedSequenceNumber.AT_TIMESTAMP, - INITIAL_POSITION_AT_TIMESTAMP); - } - - private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseExceptions( - final ExtendedSequenceNumber sequenceNumber, - final InitialPositionInStreamExtended position) - throws Exception { - final String shardIdPrefix = "shardId-%d"; - final List shards = constructShardListForGraphA(); - final List leases = createLeasesFromShards(shards, sequenceNumber, LEASE_OWNER); - - // Marking shardId-0 as ShardEnd. - leases.stream().filter(lease -> String.format(shardIdPrefix, 0).equals(lease.leaseKey())).findFirst() - .ifPresent(lease -> lease.checkpoint(ExtendedSequenceNumber.SHARD_END)); - - // Marking child of shardId-0 to be processed and not at TRIM_HORIZON. - leases.stream().filter(lease -> String.format(shardIdPrefix, 6).equals(lease.leaseKey())).findFirst() - .ifPresent(lease -> lease.checkpoint(new ExtendedSequenceNumber("1"))); - - final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); - final ArgumentCaptor leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class); - - when(shardDetector.listShards()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()).thenReturn(leases); - when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true); - doThrow(new DependencyException(new Throwable("Throw for DeleteLease"))).doNothing() - .when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); - - // Initial call: Call to create leases. - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); - - final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); - - assertThat(createLeases, equalTo(expectedCreateLeases)); - - verify(shardDetector, times(1)).listShards(); - verify(dynamoDBLeaseRefresher, times(1)).listLeases(); - verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); - - try { - // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - } finally { - List deleteLeases = leaseDeleteCaptor.getAllValues(); - Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - final Set expectedShardIds = new HashSet<>( - Collections.singletonList(String.format(shardIdPrefix, 0))); - final Set expectedSequenceNumbers = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.SHARD_END)); - - assertThat(deleteLeases.size(), equalTo(1)); - assertThat(shardIds, equalTo(expectedShardIds)); - assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers)); - - verify(shardDetector, times(2)).listShards(); - verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, times(2)).listLeases(); - verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class)); - - // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - deleteLeases = leaseDeleteCaptor.getAllValues(); - - shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint).collect(Collectors.toSet()); - - assertThat(deleteLeases.size(), equalTo(2)); - assertThat(shardIds, equalTo(expectedShardIds)); - assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers)); - - verify(shardDetector, times(3)).listShards(); - verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, times(3)).listLeases(); - verify(dynamoDBLeaseRefresher, times(2)).deleteLease(any(Lease.class)); - } } @Test(expected = DependencyException.class) @@ -985,20 +876,18 @@ public class HierarchicalShardSyncerTest { .ifPresent(lease -> lease.checkpoint(new ExtendedSequenceNumber("1"))); final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); - final ArgumentCaptor leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class); when(shardDetector.listShards()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()) .thenThrow(new DependencyException(new Throwable("Throw for ListLeases"))) .thenReturn(Collections.emptyList()).thenReturn(leases); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true); - doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); try { // Initial call: Call to create leases. Fails on ListLeases hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -1008,7 +897,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases not present, leases will be created. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); @@ -1023,26 +912,17 @@ public class HierarchicalShardSyncerTest { // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final List deleteLeases = leaseDeleteCaptor.getAllValues(); - final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( Collections.singletonList(String.format(shardIdPrefix, 0))); final Set expectedSequenceNumbers = new HashSet<>( Collections.singletonList(ExtendedSequenceNumber.SHARD_END)); - assertThat(deleteLeases.size(), equalTo(1)); - assertThat(shardIds, equalTo(expectedShardIds)); - assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers)); - verify(shardDetector, times(3)).listShards(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(3)).listLeases(); - verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } } @@ -1076,20 +956,18 @@ public class HierarchicalShardSyncerTest { .ifPresent(lease -> lease.checkpoint(new ExtendedSequenceNumber("1"))); final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); - final ArgumentCaptor leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class); when(shardDetector.listShards()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()) .thenReturn(Collections.emptyList()).thenReturn(leases); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())) .thenThrow(new DependencyException(new Throwable("Throw for CreateLease"))).thenReturn(true); - doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); try { // Initial call: No leases present, create leases. Create lease Fails hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -1098,7 +976,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -1114,27 +992,13 @@ public class HierarchicalShardSyncerTest { // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final List deleteLeases = leaseDeleteCaptor.getAllValues(); - final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - final Set expectedShardIds = new HashSet<>( - Collections.singletonList(String.format(shardIdPrefix, 0))); - final Set expectedSequenceNumbers = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.SHARD_END)); - - assertThat(deleteLeases.size(), equalTo(1)); - assertThat(shardIds, equalTo(expectedShardIds)); - assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers)); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, times(3)).listShards(); verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size())) .createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(3)).listLeases(); - verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } } @@ -1186,64 +1050,6 @@ public class HierarchicalShardSyncerTest { }).collect(Collectors.toList()); } - @Test - public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception { - final List shards = constructShardListForGraphA(); - final String garbageShardId = "shardId-garbage-001"; - final Shard garbageShard = ShardObjectHelper.newShard(garbageShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("101", null)); - final Lease garbageLease = createLeaseFromShard(garbageShard, new ExtendedSequenceNumber("99"), LEASE_OWNER); - final List leases = new ArrayList<>( - createLeasesFromShards(shards, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER)); - leases.add(garbageLease); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - - when(shardDetector.listShards()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases); - doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); - - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); - assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); - - verify(shardDetector, times(2)).listShards(); - verify(dynamoDBLeaseRefresher).listLeases(); - verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); - } - - @Test - public void testCleanUpGarbageLeaseForNonExistentShardForMultiStream() throws Exception { - final List shards = constructShardListForGraphA(); - final String garbageShardId = "shardId-garbage-001"; - final Shard garbageShard = ShardObjectHelper.newShard(garbageShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("101", null)); - final Lease garbageLease = createMultiStreamLeaseFromShard(garbageShard, new ExtendedSequenceNumber("99"), LEASE_OWNER); - final List leases = new ArrayList<>( - createMultiStreamLeasesFromShards(shards, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER)); - leases.add(garbageLease); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - - when(shardDetector.listShards()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))).thenReturn(leases); - doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); - setupMultiStream(); - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); - assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); - - verify(shardDetector, times(2)).listShards(); - verify(dynamoDBLeaseRefresher).listLeasesForStream(any(StreamIdentifier.class)); - verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); - } - private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition) throws Exception { final String shardId0 = "shardId-0"; @@ -1279,7 +1085,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -2473,7 +2279,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter); verify(shardDetector, never()).listShards(); @@ -2495,7 +2301,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, atLeast(1)).listShards(); } @@ -2518,7 +2324,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. @@ -2547,7 +2353,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. @@ -2570,7 +2376,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); verify(shardDetector, times(1)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.