Removing lease cleanup from HierarchicalShardSyncer (#30)

* Removing lease cleanup from HierarchicalShardSyncer
This commit is contained in:
Joshua Kim 2020-06-11 19:56:04 -04:00 committed by GitHub
parent 2e2211a9b7
commit c8422745d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 454 deletions

View file

@ -17,15 +17,11 @@ package software.amazon.kinesis.leases;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
@ -106,9 +102,7 @@ public class HierarchicalShardSyncer {
* @param leaseRefresher
* @param initialPosition
* @param scope
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param garbageCollectLeases
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
@ -117,20 +111,18 @@ public class HierarchicalShardSyncer {
// CHECKSTYLE:OFF CyclomaticComplexity
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope,
isLeaseTableEmpty);
}
//Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
List<Shard> latestShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
@ -166,13 +158,6 @@ public class HierarchicalShardSyncer {
}
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
if (!isLeaseTableEmpty && garbageCollectLeases) {
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs);
}
if (!isLeaseTableEmpty && cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseRefresher, multiStreamArgs);
}
}
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
@ -205,44 +190,6 @@ public class HierarchicalShardSyncer {
.flatMap(entry -> shardIdToChildShardIdsMap.get(entry.getKey()).stream()).collect(Collectors.toSet());
}
/**
* Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards.
* @param shardIdsOfClosedShards Id of the shard which is expected to be closed
* @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException
*/
synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException {
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress.";
for (String shardId : shardIdsOfClosedShards) {
final Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
log.info("{} : Shard {} is not present in Kinesis anymore.", streamIdentifier, shardId);
continue;
}
final String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
+ " is not closed. " + exceptionMessageSuffix);
}
final Set<String> childShardIds = shardIdToChildShardIdsMap.get(shardId);
if (childShardIds == null) {
throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId
+ " has no children." + exceptionMessageSuffix);
}
assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds);
}
}
private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
throws KinesisClientLibIOException {
@ -617,177 +564,6 @@ public class HierarchicalShardSyncer {
return parentShardIds;
}
/**
* Delete leases corresponding to shards that no longer exist in the stream. Current scheme: Delete a lease if:
* <ul>
* <li>The corresponding shard is not present in the list of Kinesis shards</li>
* <li>The parentShardIds listed in the lease are also not present in the list of Kinesis shards.</li>
* </ul>
*
* @param shards
* List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
* @param trackedLeases
* List of
* @param leaseRefresher
* @throws KinesisClientLibIOException
* Thrown if we couldn't get a fresh shard list from Kinesis.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
private static void cleanupGarbageLeases(@NonNull final ShardDetector shardDetector, final List<Shard> shards,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException,
DependencyException, InvalidStateException, ProvisionedThroughputException {
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
final Set<String> kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet());
// Check if there are leases for non-existent shards
final List<Lease> garbageLeases = trackedLeases.stream()
.filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList());
if (!CollectionUtils.isNullOrEmpty(garbageLeases)) {
log.info("{} : Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", streamIdentifier, garbageLeases.size());
final Set<String> currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId)
.collect(Collectors.toSet());
for (Lease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) {
log.info("{} : Deleting lease for shard {} as it is not present in Kinesis stream.",
streamIdentifier, lease.leaseKey());
leaseRefresher.deleteLease(lease);
}
}
}
}
/**
* Note: This method has package level access, solely for testing purposes.
*
* @param lease Candidate shard we are considering for deletion.
* @param currentKinesisShardIds
* @return true if neither the shard (corresponding to the lease), nor its parents are present in
* currentKinesisShardIds
* @throws KinesisClientLibIOException Thrown if currentKinesisShardIds contains a parent shard but not the child
* shard (we are evaluating for deletion).
*/
static boolean isCandidateForCleanup(final Lease lease, final Set<String> currentKinesisShardIds,
final MultiStreamArgs multiStreamArgs)
throws KinesisClientLibIOException {
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
boolean isCandidateForCleanup = true;
final String shardId = shardIdFromLeaseDeducer.apply(lease, multiStreamArgs);
if (currentKinesisShardIds.contains(shardId)) {
isCandidateForCleanup = false;
} else {
log.info("{} : Found lease for non-existent shard: {}. Checking its parent shards", streamIdentifier, shardId);
final Set<String> parentShardIds = lease.parentShardIds();
for (String parentShardId : parentShardIds) {
// Throw an exception if the parent shard exists (but the child does not).
// This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
if (currentKinesisShardIds.contains(parentShardId)) {
final String message = String.format("Parent shard %s exists but not the child shard %s",
parentShardId, shardId);
log.info("{} : {}", streamIdentifier, message);
throw new KinesisClientLibIOException(message);
}
}
}
return isCandidateForCleanup;
}
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON.
*
* @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
* @param trackedLeases List of all leases we are tracking.
* @param leaseRefresher Lease refresher (will be used to delete leases)
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws DependencyException,
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
final List<Lease> leasesOfClosedShards = currentLeases.stream()
.filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END))
.collect(Collectors.toList());
final Set<String> shardIdsOfClosedShards = leasesOfClosedShards.stream()
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet());
if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
Comparator<? super Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMap, multiStreamArgs);
leasesOfClosedShards.sort(startingSequenceNumberComparator);
final Map<String, Lease> trackedLeaseMap = trackedLeases.stream()
.collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity()));
for (Lease leaseOfClosedShard : leasesOfClosedShards) {
final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs);
final Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs);
}
}
}
}
/**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
*
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->Lease map with all leases we are tracking (should not be null)
* @param leaseRefresher
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
final List<Lease> childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull)
.collect(Collectors.toList());
if (leaseForClosedShard != null && leaseForClosedShard.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)
&& childShardLeases.size() == childShardIds.size()) {
boolean okayToDelete = true;
for (Lease lease : childShardLeases) {
if (lease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
okayToDelete = false;
break;
}
}
if (okayToDelete) {
log.info("{} : Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", streamIdentifier, shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
leaseRefresher.deleteLease(leaseForClosedShard);
}
}
}
public synchronized Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier);

View file

@ -68,7 +68,7 @@ public class ShardSyncTask implements ConsumerTask {
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
initialPosition, scope, garbageCollectLeases, ignoreUnexpectedChildShards, cleanupLeasesUponShardCompletion,
initialPosition, scope, ignoreUnexpectedChildShards,
leaseRefresher.isLeaseTableEmpty());
if (shardSyncTaskIdleTimeMillis > 0) {

View file

@ -27,7 +27,6 @@ public class ShardSyncer {
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException
@ -38,10 +37,9 @@ public class ShardSyncer {
@Deprecated
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException, InterruptedException {
final boolean ignoreUnexpectedChildShards, final MetricsScope scope)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty());
scope, ignoreUnexpectedChildShards, leaseRefresher.isLeaseTableEmpty());
}
}

View file

@ -89,8 +89,6 @@ public class HierarchicalShardSyncerTest {
private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs(
MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
private final boolean garbageCollectLeases = true;
private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false;
private HierarchicalShardSyncer hierarchicalShardSyncer;
@ -302,7 +300,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -337,7 +335,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -380,7 +378,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
latestShards, false, SCOPE,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
@ -419,7 +417,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
latestShards, false, SCOPE,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
@ -456,7 +454,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
new ArrayList<Shard>(), cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
new ArrayList<Shard>(), false, SCOPE,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>();
@ -663,7 +661,7 @@ public class HierarchicalShardSyncerTest {
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -683,7 +681,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
@ -719,7 +717,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -764,7 +762,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -820,7 +818,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -833,126 +831,19 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
// Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
// Second call: Leases present, no leases should be deleted.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
final Set<String> expectedShardIds = new HashSet<>(Collections.singletonList(String.format(shardIdPrefix, 0)));
final Set<ExtendedSequenceNumber> expectedSequenceNumbers = new HashSet<>(
Collections.singletonList(ExtendedSequenceNumber.SHARD_END));
assertThat(deleteLeases.size(), equalTo(1));
assertThat(shardIds, equalTo(expectedShardIds));
assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers));
assertThat(deleteLeases.size(), equalTo(0));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class));
}
@Test(expected = DependencyException.class)
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithDeleteLeaseExceptions()
throws Exception {
testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseExceptions(ExtendedSequenceNumber.TRIM_HORIZON,
INITIAL_POSITION_TRIM_HORIZON);
}
@Test(expected = DependencyException.class)
public void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithDeleteLeaseExceptions()
throws Exception {
testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseExceptions(ExtendedSequenceNumber.AT_TIMESTAMP,
INITIAL_POSITION_AT_TIMESTAMP);
}
private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseExceptions(
final ExtendedSequenceNumber sequenceNumber,
final InitialPositionInStreamExtended position)
throws Exception {
final String shardIdPrefix = "shardId-%d";
final List<Shard> shards = constructShardListForGraphA();
final List<Lease> leases = createLeasesFromShards(shards, sequenceNumber, LEASE_OWNER);
// Marking shardId-0 as ShardEnd.
leases.stream().filter(lease -> String.format(shardIdPrefix, 0).equals(lease.leaseKey())).findFirst()
.ifPresent(lease -> lease.checkpoint(ExtendedSequenceNumber.SHARD_END));
// Marking child of shardId-0 to be processed and not at TRIM_HORIZON.
leases.stream().filter(lease -> String.format(shardIdPrefix, 6).equals(lease.leaseKey())).findFirst()
.ifPresent(lease -> lease.checkpoint(new ExtendedSequenceNumber("1")));
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
final ArgumentCaptor<Lease> leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true);
doThrow(new DependencyException(new Throwable("Throw for DeleteLease"))).doNothing()
.when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
// Initial call: Call to create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position);
assertThat(createLeases, equalTo(expectedCreateLeases));
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
try {
// Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
final Set<String> expectedShardIds = new HashSet<>(
Collections.singletonList(String.format(shardIdPrefix, 0)));
final Set<ExtendedSequenceNumber> expectedSequenceNumbers = new HashSet<>(
Collections.singletonList(ExtendedSequenceNumber.SHARD_END));
assertThat(deleteLeases.size(), equalTo(1));
assertThat(shardIds, equalTo(expectedShardIds));
assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class));
// Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
deleteLeases = leaseDeleteCaptor.getAllValues();
shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint).collect(Collectors.toSet());
assertThat(deleteLeases.size(), equalTo(2));
assertThat(shardIds, equalTo(expectedShardIds));
assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers));
verify(shardDetector, times(3)).listShards();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
verify(dynamoDBLeaseRefresher, times(2)).deleteLease(any(Lease.class));
}
}
@Test(expected = DependencyException.class)
@ -985,20 +876,18 @@ public class HierarchicalShardSyncerTest {
.ifPresent(lease -> lease.checkpoint(new ExtendedSequenceNumber("1")));
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
final ArgumentCaptor<Lease> leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases())
.thenThrow(new DependencyException(new Throwable("Throw for ListLeases")))
.thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
try {
// Initial call: Call to create leases. Fails on ListLeases
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -1008,7 +897,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases not present, leases will be created.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position);
@ -1023,26 +912,17 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
Collections.singletonList(String.format(shardIdPrefix, 0)));
final Set<ExtendedSequenceNumber> expectedSequenceNumbers = new HashSet<>(
Collections.singletonList(ExtendedSequenceNumber.SHARD_END));
assertThat(deleteLeases.size(), equalTo(1));
assertThat(shardIds, equalTo(expectedShardIds));
assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers));
verify(shardDetector, times(3)).listShards();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
}
@ -1076,20 +956,18 @@ public class HierarchicalShardSyncerTest {
.ifPresent(lease -> lease.checkpoint(new ExtendedSequenceNumber("1")));
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
final ArgumentCaptor<Lease> leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList())
.thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture()))
.thenThrow(new DependencyException(new Throwable("Throw for CreateLease"))).thenReturn(true);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
try {
// Initial call: No leases present, create leases. Create lease Fails
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -1098,7 +976,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -1114,27 +992,13 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
final Set<String> expectedShardIds = new HashSet<>(
Collections.singletonList(String.format(shardIdPrefix, 0)));
final Set<ExtendedSequenceNumber> expectedSequenceNumbers = new HashSet<>(
Collections.singletonList(ExtendedSequenceNumber.SHARD_END));
assertThat(deleteLeases.size(), equalTo(1));
assertThat(shardIds, equalTo(expectedShardIds));
assertThat(sequenceNumbers, equalTo(expectedSequenceNumbers));
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, times(3)).listShards();
verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size()))
.createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
}
@ -1186,64 +1050,6 @@ public class HierarchicalShardSyncerTest {
}).collect(Collectors.toList());
}
@Test
public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final String garbageShardId = "shardId-garbage-001";
final Shard garbageShard = ShardObjectHelper.newShard(garbageShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("101", null));
final Lease garbageLease = createLeaseFromShard(garbageShard, new ExtendedSequenceNumber("99"), LEASE_OWNER);
final List<Lease> leases = new ArrayList<>(
createLeasesFromShards(shards, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER));
leases.add(garbageLease);
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher).listLeases();
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}
@Test
public void testCleanUpGarbageLeaseForNonExistentShardForMultiStream() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final String garbageShardId = "shardId-garbage-001";
final Shard garbageShard = ShardObjectHelper.newShard(garbageShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("101", null));
final Lease garbageLease = createMultiStreamLeaseFromShard(garbageShard, new ExtendedSequenceNumber("99"), LEASE_OWNER);
final List<Lease> leases = new ArrayList<>(
createMultiStreamLeasesFromShards(shards, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER));
leases.add(garbageLease);
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))).thenReturn(leases);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher).listLeasesForStream(any(StreamIdentifier.class));
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}
private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition)
throws Exception {
final String shardId0 = "shardId-0";
@ -1279,7 +1085,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -2473,7 +2279,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter);
verify(shardDetector, never()).listShards();
@ -2495,7 +2301,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShards();
}
@ -2518,7 +2324,7 @@ public class HierarchicalShardSyncerTest {
try {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
SCOPE, ignoreUnexpectedChildShards,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
@ -2547,7 +2353,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
SCOPE, ignoreUnexpectedChildShards,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
@ -2570,7 +2376,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
SCOPE, ignoreUnexpectedChildShards,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, times(1)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.