Merge pull request #2 from ashwing/EmptyLeaseTable

Adding empty lease table sync using ListShards w/ ShardFilter.
This commit is contained in:
Joshua Kim 2020-03-30 18:48:56 -04:00 committed by GitHub
commit 297b09481c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 566 additions and 208 deletions

View file

@ -24,12 +24,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
@ -38,6 +40,8 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
@ -85,9 +89,10 @@ public class HierarchicalShardSyncer {
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
* @param scope
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @param garbageCollectLeases
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
@ -96,20 +101,24 @@ public class HierarchicalShardSyncer {
// CHECKSTYLE:OFF CyclomaticComplexity
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> latestShards = getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, scope, latestShards);
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
isLeaseTableEmpty);
}
//Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)
throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
List<Shard> latestShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191
if (!CollectionUtils.isNullOrEmpty(latestShards)) {
log.debug("Num shards: {}", latestShards.size());
}
@ -125,8 +134,10 @@ public class HierarchicalShardSyncer {
getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) :
leaseRefresher.listLeases();
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition,
inconsistentShardIds, multiStreamArgs);
final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() :
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases,
initialPosition, inconsistentShardIds, multiStreamArgs);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
@ -140,8 +151,10 @@ public class HierarchicalShardSyncer {
}
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs);
if (cleanupLeasesOfCompletedShards) {
if (!isLeaseTableEmpty && garbageCollectLeases) {
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs);
}
if (!isLeaseTableEmpty && cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseRefresher, multiStreamArgs);
}
@ -299,55 +312,69 @@ public class HierarchicalShardSyncer {
return shardIdToChildShardIdsMap;
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final List<Shard> shards = shardDetector.listShards();
if (shards == null) {
throw new KinesisClientLibIOException(
"Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
/**
* Helper method to resolve the correct shard filter to use when listing shards from a position in a stream.
* @param initialPositionInStreamExtended
* @return ShardFilter shard filter for the corresponding position in the stream.
*/
private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) {
ShardFilter.Builder builder = ShardFilter.builder();
switch (initialPositionInStreamExtended.getInitialPositionInStream()) {
case LATEST:
builder = builder.type(ShardFilterType.AT_LATEST);
break;
case TRIM_HORIZON:
builder = builder.type(ShardFilterType.AT_TRIM_HORIZON);
break;
case AT_TIMESTAMP:
builder = builder.type(ShardFilterType.AT_TIMESTAMP).timestamp(initialPositionInStreamExtended.getTimestamp().toInstant());
break;
}
return shards;
return builder.build();
}
private static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector,
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException {
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter));
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShards());
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
*
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
* @param leaseSynchronizer determines the strategy we'll be using to update any new leases.
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param inconsistentShardIds Set of child shard ids having open parents.
* @param multiStreamArgs determines if we are using multistream mode.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,
final Set<String> inconsistentShardIds, final MultiStreamArgs multiStreamArgs) {
return leaseSynchronizer.determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds, multiStreamArgs);
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
*
* @param leaseSynchronizer determines the strategy we'll be using to update any new leases.
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
@ -355,92 +382,27 @@ public class HierarchicalShardSyncer {
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds,
final MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext, multiStreamArgs);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant
&& !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.checkpoint(convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
final Comparator<Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds,
static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null));
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* @param leaseSynchronizer determines the strategy we'll be using to update any new leases.
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition) {
static List<Lease> determineNewLeasesToCreate(final LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition) {
final Set<String> inconsistentShardIds = new HashSet<>();
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds);
}
/**
@ -682,6 +644,7 @@ public class HierarchicalShardSyncer {
if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
Comparator<? super Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMap, multiStreamArgs);
leasesOfClosedShards.sort(startingSequenceNumberComparator);
@ -864,4 +827,210 @@ public class HierarchicalShardSyncer {
private final StreamIdentifier streamIdentifier;
}
/**
* Interface to determine how to create new leases.
*/
@VisibleForTesting
interface LeaseSynchronizer {
/**
* Determines how to create leases.
* @param shards
* @param currentLeases
* @param initialPosition
* @param inconsistentShardIds
* @param multiStreamArgs
* @return
*/
List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds,
MultiStreamArgs multiStreamArgs);
}
/**
* Class to help create leases when the table is initially empty.
*/
@Slf4j
@AllArgsConstructor
static class EmptyLeaseTableSynchronizer implements LeaseSynchronizer {
/**
* Determines how to create leases when the lease table is initially empty. For this, we read all shards where
* the KCL is reading from. For any shards which are closed, we will discover their child shards through GetRecords
* child shard information.
*
* @param shards
* @param currentLeases
* @param initialPosition
* @param inconsistentShardIds
* @param multiStreamArgs
* @return
*/
@Override
public List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
currentLeases.stream().peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs);
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
final Comparator<Lease> startingSequenceNumberComparator =
new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
/**
* Helper method to create leases. For an empty lease table, we will be creating leases for all shards
* regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon
* reaching SHARD_END.
*/
private List<Lease> getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition,
List<Shard> shards, MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
for (Shard shard : shards) {
final String shardId = shard.shardId();
final Lease lease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier) : newKCLLease(shard);
lease.checkpoint(convertToCheckpoint(initialPosition));
log.debug("Need to create a lease for shard with shardId {}", shardId);
shardIdToNewLeaseMap.put(shardId, lease);
}
return new ArrayList(shardIdToNewLeaseMap.values());
}
}
/**
* Class to help create leases when the lease table is not initially empty.
*/
@Slf4j
@AllArgsConstructor
static class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
private final ShardDetector shardDetector;
private final Map<String, Shard> shardIdToShardMap;
private final Map<String, Set<String>> shardIdToChildShardIdsMap;
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
* <p>
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
* <p>
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
* <p>
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
* <p>
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
* <p>
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
@Override
public synchronized List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds, MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = getOpenShards(shards);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext, multiStreamArgs);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant
&& !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.checkpoint(convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
//TODO: Verify before LTR launch that ending sequence number is still returned from the service.
final Comparator<Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
}
}

View file

@ -42,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
@ -149,12 +150,18 @@ public class KinesisShardDetector implements ShardDetector {
@Override
@Synchronized
public List<Shard> listShards() {
return listShardsWithFilter(null);
}
@Override
@Synchronized
public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
final List<Shard> shards = new ArrayList<>();
ListShardsResponse result;
String nextToken = null;
do {
result = listShards(nextToken);
result = listShards(shardFilter, nextToken);
if (result == null) {
/*
@ -172,13 +179,13 @@ public class KinesisShardDetector implements ShardDetector {
return shards;
}
private ListShardsResponse listShards(final String nextToken) {
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder();
ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder().shardFilter(shardFilter);
if (StringUtils.isEmpty(nextToken)) {
request = request.streamName(streamIdentifier.streamName());
} else {

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.StreamIdentifier;
import java.util.List;
@ -28,6 +29,8 @@ public interface ShardDetector {
List<Shard> listShards();
List<Shard> listShardsWithFilter(ShardFilter shardFilter);
default StreamIdentifier streamIdentifier() {
throw new UnsupportedOperationException("StreamName not available");
}

View file

@ -46,6 +46,7 @@ public class ShardSyncTask implements ConsumerTask {
@NonNull
private final InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean garbageCollectLeases;
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
@NonNull
@ -66,8 +67,10 @@ public class ShardSyncTask implements ConsumerTask {
boolean shardSyncSuccess = true;
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
initialPosition, scope, garbageCollectLeases, ignoreUnexpectedChildShards, cleanupLeasesUponShardCompletion,
leaseRefresher.isLeaseTableEmpty());
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}

View file

@ -48,6 +48,7 @@ public class ShardSyncTaskManager {
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean garbageCollectLeases;
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;
@NonNull
@ -84,6 +85,7 @@ public class ShardSyncTaskManager {
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.garbageCollectLeases = true;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
@ -114,6 +116,7 @@ public class ShardSyncTaskManager {
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.garbageCollectLeases = true;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
@ -128,6 +131,7 @@ public class ShardSyncTaskManager {
leaseRefresher,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
garbageCollectLeases,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
@ -166,6 +170,7 @@ public class ShardSyncTaskManager {
leaseRefresher,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
garbageCollectLeases,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,

View file

@ -19,6 +19,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
@Deprecated
public class ShardSyncer {
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
private static final boolean garbageCollectLeases = true;
/**
* <p>NOTE: This method is deprecated and will be removed in a future release.</p>
@ -41,6 +42,6 @@ public class ShardSyncer {
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty());
}
}

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting;
import com.sun.org.apache.bcel.internal.generic.LUSHR;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -28,7 +27,6 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@ -42,8 +40,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
@ -69,6 +65,8 @@ public class ShutdownTask implements ConsumerTask {
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
private final boolean garbageCollectLeases = false;
private final boolean isLeaseTableEmpty= false;
private final boolean ignoreUnexpectedChildShards;
@NonNull
private final LeaseCoordinator leaseCoordinator;
@ -155,7 +153,8 @@ public class ShutdownTask implements ConsumerTask {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
initialPositionInStream, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
isLeaseTableEmpty);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
}

View file

@ -21,6 +21,7 @@ package software.amazon.kinesis.leases;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@ -41,6 +42,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
@ -53,6 +55,8 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
@ -80,6 +84,7 @@ public class HierarchicalShardSyncerTest {
private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs(
MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
private final boolean garbageCollectLeases = true;
private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false;
@ -112,9 +117,10 @@ public class HierarchicalShardSyncerTest {
public void testDetermineNewLeasesToCreateNoShards() {
final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(),
equalTo(true));
assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases,
INITIAL_POSITION_LATEST).isEmpty(), equalTo(true));
}
/**
@ -123,10 +129,11 @@ public class HierarchicalShardSyncerTest {
@Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() {
final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer
.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS)
.isEmpty(), equalTo(true));
.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST,
new HashSet<>(), MULTI_STREAM_ARGS).isEmpty(), equalTo(true));
}
/**
@ -141,9 +148,10 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -163,9 +171,10 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
@ -175,7 +184,7 @@ public class HierarchicalShardSyncerTest {
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* Test determineNewLeasesToCreate() where there is one lease and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
*/
@Test
@ -183,16 +192,24 @@ public class HierarchicalShardSyncerTest {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final String shardId3 = "shardId-3";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId3, null, null, sequenceRange));
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final List<Shard> shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList());
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -209,16 +226,24 @@ public class HierarchicalShardSyncerTest {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final String shardId3 = "shardId-3";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId3, null, null, sequenceRange));
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final List<Shard> shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList());
final List<Lease> currentLeases = new ArrayList(createMultiStreamLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"));
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(
@ -258,7 +283,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -292,7 +317,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -334,7 +359,8 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -370,7 +396,8 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
latestShards, cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
@ -406,7 +433,8 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList<Shard>());
new ArrayList<Shard>(), cleanupLeasesOfCompletedShards, false, SCOPE, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<String> expectedShardIds = new HashSet<>();
@ -446,7 +474,7 @@ public class HierarchicalShardSyncerTest {
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -466,7 +494,8 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -501,7 +530,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -546,7 +575,7 @@ public class HierarchicalShardSyncerTest {
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, true, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -602,7 +631,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -617,7 +646,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
@ -677,7 +706,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -694,7 +723,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -718,7 +747,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
deleteLeases = leaseDeleteCaptor.getAllValues();
@ -779,7 +808,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases. Fails on ListLeases
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -789,7 +818,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases not present, leases will be created.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -804,7 +833,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -870,7 +899,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases. Create lease Fails
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -879,7 +908,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -894,7 +923,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -984,7 +1013,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1013,7 +1042,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -1045,7 +1074,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, false, SCOPE);
SCOPE, cleanupLeasesOfCompletedShards, false, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -1069,6 +1098,7 @@ public class HierarchicalShardSyncerTest {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final List<Lease> currentLeases = new ArrayList<>();
final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
@ -1080,7 +1110,7 @@ public class HierarchicalShardSyncerTest {
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
for (InitialPositionInStreamExtended initialPosition : initialPositions) {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases,
initialPosition);
assertThat(newLeases.size(), equalTo(2));
@ -1092,18 +1122,33 @@ public class HierarchicalShardSyncerTest {
}
}
/**
* Tests that leases are not created for closed shards.
*/
@Test
public void testDetermineNewLeasesToCreateIgnoreClosedShard() {
final String lastShardId = "shardId-1";
final List<Lease> currentLeases = new ArrayList<>();
final List<Shard> shards = Arrays.asList(
final List<Shard> shardsWithoutLeases = Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null,
ShardObjectHelper.newSequenceNumberRange("303", "404")),
ShardObjectHelper.newShard(lastShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("405", null)));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard("shardId-2", null,
null, ShardObjectHelper.newSequenceNumberRange("202", "302")));
final List<Shard> shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList());
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
final Set<String> inconsistentShardIds = Collections.emptySet();
Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST);
assertThat(newLeases.size(), equalTo(1));
@ -1125,9 +1170,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphA();
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5"));
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -1161,8 +1212,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -1194,8 +1252,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1229,8 +1294,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1259,8 +1331,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1294,8 +1373,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList());
@ -1328,8 +1414,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList());
@ -1355,8 +1448,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList());
@ -1515,7 +1615,74 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseMap.isEmpty(), equalTo(true));
}
// /**
/**
* Tests that when reading from TIP, we use the AT_LATEST shard filter.
* @throws Exception
*/
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception {
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter);
}
/**
* Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter.
* @throws Exception
*/
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception {
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter);
}
/**
* Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter.
* @throws Exception
*/
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception {
ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(new Date(1000L).toInstant()).build();
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter);
}
public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, ShardFilter shardFilter) throws Exception {
final String shardId0 = "shardId-0";
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null,
ShardObjectHelper.newSequenceNumberRange("1", null), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY)));
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
when(shardDetector.listShardsWithFilter(shardFilter)).thenReturn(shards);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter);
verify(shardDetector, never()).listShards();
}
@Test
public void testNonEmptyLeaseTableUsesListShards() throws Exception {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", "2")));
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, ShardObjectHelper.newSequenceNumberRange("3", "4")));
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
when(shardDetector.listShards()).thenReturn(shardsWithoutLeases);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShards();
}
// /**getShardFilterFromInitialPosition
// * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent.
// */
// @Test

View file

@ -118,7 +118,8 @@ public class ShardSyncTaskIntegrationTest {
leaseRefresher.deleteAll();
Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet());
ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
false, true, false, 0L,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
syncTask.call();
List<Lease> leases = leaseRefresher.listLeases();

View file

@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
@ -42,7 +43,6 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
@ -127,6 +127,9 @@ public class ShutdownTaskTest {
*/
@Test
public final void testCallWhenSyncingShardsThrows() throws Exception {
final boolean garbageCollectLeases = false;
final boolean isLeaseTableEmpty = false;
List<Shard> latestShards = constructShardListGraphA();
when(shardDetector.listShards()).thenReturn(latestShards);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
@ -136,8 +139,8 @@ public class ShutdownTaskTest {
throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics(), latestShards);
latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics(), garbageCollectLeases, isLeaseTableEmpty);
final TaskResult result = task.call();
assertNotNull(result.getException());