Adding empty lease table sync.

This commit is contained in:
Joshua Kim 2020-03-20 05:52:27 -07:00
parent f03a0fc561
commit e906a835f8
10 changed files with 374 additions and 253 deletions

View file

@ -24,12 +24,14 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -38,6 +40,8 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
@ -85,6 +89,7 @@ public class HierarchicalShardSyncer {
* @param shardDetector * @param shardDetector
* @param leaseRefresher * @param leaseRefresher
* @param initialPosition * @param initialPosition
* @param garbageCollectLeases
* @param cleanupLeasesOfCompletedShards * @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards * @param ignoreUnexpectedChildShards
* @param scope * @param scope
@ -96,20 +101,24 @@ public class HierarchicalShardSyncer {
// CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF CyclomaticComplexity
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards,
final MetricsScope scope) throws DependencyException, InvalidStateException, final boolean ignoreUnexpectedChildShards, final MetricsScope scope)
ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> latestShards = getShardList(shardDetector); final List<Shard> latestShards = leaseRefresher.isLeaseTableEmpty() ?
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
ignoreUnexpectedChildShards, scope, latestShards); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, garbageCollectLeases,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
} }
//Provide a pre-collcted list of shards to avoid calling ListShards API //Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean garbageCollectLeases, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards) final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)
throws DependencyException, InvalidStateException, throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
ProvisionedThroughputException, KinesisClientLibIOException {
final boolean isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty();
if (!CollectionUtils.isNullOrEmpty(latestShards)) { if (!CollectionUtils.isNullOrEmpty(latestShards)) {
log.debug("Num shards: {}", latestShards.size()); log.debug("Num shards: {}", latestShards.size());
} }
@ -125,8 +134,10 @@ public class HierarchicalShardSyncer {
getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) : getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) :
leaseRefresher.listLeases(); leaseRefresher.listLeases();
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier()); final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() :
inconsistentShardIds, multiStreamArgs); new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases,
initialPosition, inconsistentShardIds, multiStreamArgs);
log.debug("Num new leases to create: {}", newLeasesToCreate.size()); log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) { for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -140,8 +151,10 @@ public class HierarchicalShardSyncer {
} }
final List<Lease> trackedLeases = new ArrayList<>(currentLeases); final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate); trackedLeases.addAll(newLeasesToCreate);
if (!isLeaseTableEmpty && garbageCollectLeases) {
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs); cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs);
if (cleanupLeasesOfCompletedShards) { }
if (!isLeaseTableEmpty && cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseRefresher, multiStreamArgs); leaseRefresher, multiStreamArgs);
} }
@ -299,6 +312,33 @@ public class HierarchicalShardSyncer {
return shardIdToChildShardIdsMap; return shardIdToChildShardIdsMap;
} }
private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) {
ShardFilter.Builder builder = ShardFilter.builder();
switch (initialPositionInStreamExtended.getInitialPositionInStream()) {
case LATEST:
builder = builder.type(ShardFilterType.AT_LATEST);
break;
case TRIM_HORIZON:
builder = builder.type(ShardFilterType.AT_TRIM_HORIZON);
break;
case AT_TIMESTAMP:
builder = builder.type(ShardFilterType.AT_TIMESTAMP).timestamp(initialPositionInStreamExtended.getTimestamp().toInstant());
break;
}
return builder.build();
}
private static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector,
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException {
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter));
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - " +
"will retry getting the shard list."));
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final List<Shard> shards = shardDetector.listShards(); final List<Shard> shards = shardDetector.listShards();
if (shards == null) { if (shards == null) {
@ -312,42 +352,7 @@ public class HierarchicalShardSyncer {
* Determine new leases to create and their initial checkpoint. * Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes. * Note: Package level access only for testing purposes.
* *
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease, * @param leaseSynchronizer determines the strategy we'll be using to update any new leases.
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
*
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
*
* @param shards List of all shards in Kinesis (we'll create new leases based on this set) * @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases * @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
@ -355,81 +360,15 @@ public class HierarchicalShardSyncer {
* @param inconsistentShardIds Set of child shard ids having open parents. * @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/ */
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases, static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds, final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,
final MultiStreamArgs multiStreamArgs) { final Set<String> inconsistentShardIds, final MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>(); return leaseSynchronizer.determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds, multiStreamArgs);
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext, multiStreamArgs);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant
&& !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.checkpoint(convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
} }
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values()); static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final Comparator<Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,final Set<String> inconsistentShardIds) {
shardIdToShardMapOfAllKinesisShards, multiStreamArgs); return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds,
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null)); new MultiStreamArgs(false, null));
} }
@ -437,10 +376,10 @@ public class HierarchicalShardSyncer {
* Determine new leases to create and their initial checkpoint. * Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes. * Note: Package level access only for testing purposes.
*/ */
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases, static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final InitialPositionInStreamExtended initialPosition) { final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition) {
final Set<String> inconsistentShardIds = new HashSet<>(); final Set<String> inconsistentShardIds = new HashSet<>();
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds);
} }
/** /**
@ -450,6 +389,7 @@ public class HierarchicalShardSyncer {
* See javadoc of determineNewLeasesToCreate() for rules and example. * See javadoc of determineNewLeasesToCreate() for rules and example.
* *
* @param shardId The shardId to check. * @param shardId The shardId to check.
* @param shardId The shardId to check.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints). * location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param shardIdsOfCurrentLeases The shardIds for the current leases. * @param shardIdsOfCurrentLeases The shardIds for the current leases.
@ -682,6 +622,7 @@ public class HierarchicalShardSyncer {
if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) { if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
Comparator<? super Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( Comparator<? super Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMap, multiStreamArgs); shardIdToShardMap, multiStreamArgs);
leasesOfClosedShards.sort(startingSequenceNumberComparator); leasesOfClosedShards.sort(startingSequenceNumberComparator);
@ -864,4 +805,178 @@ public class HierarchicalShardSyncer {
private final StreamIdentifier streamIdentifier; private final StreamIdentifier streamIdentifier;
} }
@VisibleForTesting
static interface LeaseSynchronizer {
List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds,
MultiStreamArgs multiStreamArgs);
}
@Slf4j
@AllArgsConstructor
static class EmptyLeaseTableSynchronizer implements LeaseSynchronizer {
@Override
public List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
currentLeases.stream().peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards);
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
final Comparator<Lease> startingSequenceNumberComparator =
new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
/**
* Helper method to create leases. For an empty lease table, we will be creating leases for all shards
* regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon
* reaching SHARD_END.
*/
private List<Lease> getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, List<Shard> shards) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
for (Shard shard : shards) {
final String shardId = shard.shardId();
final Lease lease = newKCLLease(shard);
lease.checkpoint(convertToCheckpoint(initialPosition));
log.debug("Need to create a lease for shard with shardId {}", shardId);
shardIdToNewLeaseMap.put(shardId, lease);
}
return new ArrayList(shardIdToNewLeaseMap.values());
}
}
@Slf4j
@AllArgsConstructor
static class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
private final ShardDetector shardDetector;
private final Map<String, Shard> shardIdToShardMap;
private final Map<String, Set<String>> shardIdToChildShardIdsMap;
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
* <p>
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
* <p>
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
* <p>
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
* <p>
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
* <p>
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
@Override
public synchronized List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext, multiStreamArgs);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant
&& !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.checkpoint(convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
final Comparator<Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
}
} }

View file

@ -42,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.FutureUtils;
@ -149,12 +150,18 @@ public class KinesisShardDetector implements ShardDetector {
@Override @Override
@Synchronized @Synchronized
public List<Shard> listShards() { public List<Shard> listShards() {
return listShardsWithFilter(null);
}
@Override
@Synchronized
public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
final List<Shard> shards = new ArrayList<>(); final List<Shard> shards = new ArrayList<>();
ListShardsResponse result; ListShardsResponse result;
String nextToken = null; String nextToken = null;
do { do {
result = listShards(nextToken); result = listShards(shardFilter, nextToken);
if (result == null) { if (result == null) {
/* /*
@ -172,13 +179,13 @@ public class KinesisShardDetector implements ShardDetector {
return shards; return shards;
} }
private ListShardsResponse listShards(final String nextToken) { private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager(); final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(LimitExceededException.class, t -> t); exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t); exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t); exceptionManager.add(KinesisException.class, t -> t);
ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder(); ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder().shardFilter(shardFilter);
if (StringUtils.isEmpty(nextToken)) { if (StringUtils.isEmpty(nextToken)) {
request = request.streamName(streamIdentifier.streamName()); request = request.streamName(streamIdentifier.streamName());
} else { } else {

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
import java.util.List; import java.util.List;
@ -28,6 +29,8 @@ public interface ShardDetector {
List<Shard> listShards(); List<Shard> listShards();
List<Shard> listShardsWithFilter(ShardFilter shardFilter);
default StreamIdentifier streamIdentifier() { default StreamIdentifier streamIdentifier() {
throw new UnsupportedOperationException("StreamName not available"); throw new UnsupportedOperationException("StreamName not available");
} }

View file

@ -46,6 +46,7 @@ public class ShardSyncTask implements ConsumerTask {
@NonNull @NonNull
private final InitialPositionInStreamExtended initialPosition; private final InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
private final boolean garbageCollectLeases;
private final boolean ignoreUnexpectedChildShards; private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis; private final long shardSyncTaskIdleTimeMillis;
@NonNull @NonNull
@ -66,8 +67,9 @@ public class ShardSyncTask implements ConsumerTask {
boolean shardSyncSuccess = true; boolean shardSyncSuccess = true;
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope); initialPosition, cleanupLeasesUponShardCompletion, garbageCollectLeases, ignoreUnexpectedChildShards, scope);
if (shardSyncTaskIdleTimeMillis > 0) { if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis); Thread.sleep(shardSyncTaskIdleTimeMillis);
} }

View file

@ -128,6 +128,7 @@ public class ShardSyncTaskManager {
leaseRefresher, leaseRefresher,
initialPositionInStream, initialPositionInStream,
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
true,
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis, shardSyncIdleTimeMillis,
hierarchicalShardSyncer, hierarchicalShardSyncer,
@ -166,6 +167,7 @@ public class ShardSyncTaskManager {
leaseRefresher, leaseRefresher,
initialPositionInStream, initialPositionInStream,
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
true,
ignoreUnexpectedChildShards, ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis, shardSyncIdleTimeMillis,
hierarchicalShardSyncer, hierarchicalShardSyncer,

View file

@ -41,6 +41,6 @@ public class ShardSyncer {
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException { KinesisClientLibIOException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
} }
} }

View file

@ -155,7 +155,7 @@ public class ShutdownTask implements ConsumerTask {
log.debug("Looking for child shards of shard {}", shardInfo.shardId()); log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards // create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); initialPositionInStream, cleanupLeasesOfCompletedShards, true, ignoreUnexpectedChildShards, scope, latestShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
} }

View file

@ -80,6 +80,7 @@ public class HierarchicalShardSyncerTest {
private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs( private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs(
MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
private final boolean garbageCollectLeases = true;
private final boolean cleanupLeasesOfCompletedShards = true; private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false; private final boolean ignoreUnexpectedChildShards = false;
@ -112,9 +113,10 @@ public class HierarchicalShardSyncerTest {
public void testDetermineNewLeasesToCreateNoShards() { public void testDetermineNewLeasesToCreateNoShards() {
final List<Shard> shards = Collections.emptyList(); final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList(); final List<Lease> leases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(), assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases,
equalTo(true)); INITIAL_POSITION_LATEST).isEmpty(), equalTo(true));
} }
/** /**
@ -123,10 +125,11 @@ public class HierarchicalShardSyncerTest {
@Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() { @Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() {
final List<Shard> shards = Collections.emptyList(); final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList(); final List<Lease> leases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer assertThat(HierarchicalShardSyncer
.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS) .determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST,
.isEmpty(), equalTo(true)); new HashSet<>(), MULTI_STREAM_ARGS).isEmpty(), equalTo(true));
} }
/** /**
@ -141,9 +144,10 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList(); final List<Lease> currentLeases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
INITIAL_POSITION_LATEST); shards, currentLeases, INITIAL_POSITION_LATEST);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -163,9 +167,10 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList(); final List<Lease> currentLeases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseIds = new HashSet<>( final Set<String> expectedLeaseIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
@ -174,59 +179,6 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseKeys, equalTo(expectedLeaseIds)); assertThat(newLeaseKeys, equalTo(expectedLeaseIds));
} }
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/** /**
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
*/ */
@ -258,7 +210,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -292,7 +244,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -334,7 +286,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -370,7 +322,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
final Set<String> expectedShardIds = new HashSet<>( final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -406,7 +358,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList<Shard>()); garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList<Shard>());
final Set<String> expectedShardIds = new HashSet<>(); final Set<String> expectedShardIds = new HashSet<>();
@ -446,7 +398,7 @@ public class HierarchicalShardSyncerTest {
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
} finally { } finally {
verify(shardDetector).listShards(); verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases(); verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -466,7 +418,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
} finally { } finally {
verify(shardDetector).listShards(); verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases(); verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -501,7 +453,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE);
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -546,7 +498,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, true, SCOPE);
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -602,7 +554,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases. // Initial call: No leases present, create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -617,7 +569,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
@ -677,7 +629,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases. // Initial call: Call to create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -694,7 +646,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
} finally { } finally {
List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -718,7 +670,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
deleteLeases = leaseDeleteCaptor.getAllValues(); deleteLeases = leaseDeleteCaptor.getAllValues();
@ -779,7 +731,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases. Fails on ListLeases // Initial call: Call to create leases. Fails on ListLeases
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
} finally { } finally {
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -789,7 +741,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases not present, leases will be created. // Second call: Leases not present, leases will be created.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -804,7 +756,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -870,7 +822,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases. Create lease Fails // Initial call: No leases present, create leases. Create lease Fails
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
} finally { } finally {
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -879,7 +831,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -894,7 +846,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues(); final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -984,7 +936,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1013,7 +965,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream(); setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); INITIAL_POSITION_TRIM_HORIZON, garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1045,7 +997,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, false, SCOPE); garbageCollectLeases, cleanupLeasesOfCompletedShards, false, SCOPE);
final List<Lease> leases = leaseCaptor.getAllValues(); final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -1069,6 +1021,7 @@ public class HierarchicalShardSyncerTest {
final String shardId0 = "shardId-0"; final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1"; final String shardId1 = "shardId-1";
final List<Lease> currentLeases = new ArrayList<>(); final List<Lease> currentLeases = new ArrayList<>();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
@ -1080,7 +1033,7 @@ public class HierarchicalShardSyncerTest {
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
for (InitialPositionInStreamExtended initialPosition : initialPositions) { for (InitialPositionInStreamExtended initialPosition : initialPositions) {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases,
initialPosition); initialPosition);
assertThat(newLeases.size(), equalTo(2)); assertThat(newLeases.size(), equalTo(2));
@ -1092,24 +1045,6 @@ public class HierarchicalShardSyncerTest {
} }
} }
@Test
public void testDetermineNewLeasesToCreateIgnoreClosedShard() {
final String lastShardId = "shardId-1";
final List<Lease> currentLeases = new ArrayList<>();
final List<Shard> shards = Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null,
ShardObjectHelper.newSequenceNumberRange("303", "404")),
ShardObjectHelper.newShard(lastShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("405", null)));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
assertThat(newLeases.size(), equalTo(1));
assertThat(newLeases.get(0).leaseKey(), equalTo(lastShardId));
}
// /** // /**
// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest) // * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
// * Shard structure (each level depicts a stream segment): // * Shard structure (each level depicts a stream segment):
@ -1125,9 +1060,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphA(); final List<Shard> shards = constructShardListForGraphA();
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5")); newLease("shardId-5"));
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
INITIAL_POSITION_LATEST); new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>(); final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -1161,8 +1102,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7")); newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
INITIAL_POSITION_LATEST); final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>(); final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -1194,8 +1142,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5")); newLease("shardId-5"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
INITIAL_POSITION_TRIM_HORIZON); final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1229,8 +1184,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7")); newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
INITIAL_POSITION_TRIM_HORIZON); final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1259,8 +1221,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB(); final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>(); final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
INITIAL_POSITION_TRIM_HORIZON); final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1294,8 +1263,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5")); newLease("shardId-5"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
INITIAL_POSITION_AT_TIMESTAMP); final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -1328,8 +1304,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7")); newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
INITIAL_POSITION_AT_TIMESTAMP); final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -1355,8 +1338,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB(); final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>(); final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
INITIAL_POSITION_AT_TIMESTAMP); final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint) final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList()); .collect(Collectors.toList());

View file

@ -118,7 +118,8 @@ public class ShardSyncTaskIntegrationTest {
leaseRefresher.deleteAll(); leaseRefresher.deleteAll();
Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet()); Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet());
ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher, ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
false, true, false, 0L,
hierarchicalShardSyncer, NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
syncTask.call(); syncTask.call();
List<Lease> leases = leaseRefresher.listLeases(); List<Lease> leases = leaseRefresher.listLeases();

View file

@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -136,7 +137,7 @@ public class ShutdownTaskTest {
throw new KinesisClientLibIOException("KinesisClientLibIOException"); throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer) }).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, true, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics(), latestShards); NULL_METRICS_FACTORY.createMetrics(), latestShards);
final TaskResult result = task.call(); final TaskResult result = task.call();