Adding empty lease table sync

This commit is contained in:
Ashwin Giridharan 2020-02-19 18:43:45 -08:00 committed by Joshua Kim
parent 0285789a24
commit 7349c7655f
10 changed files with 595 additions and 443 deletions

View file

@ -0,0 +1,78 @@
package software.amazon.kinesis.leases;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static software.amazon.kinesis.leases.HierarchicalShardSyncer.constructShardIdToShardMap;
import static software.amazon.kinesis.leases.HierarchicalShardSyncer.convertToCheckpoint;
import static software.amazon.kinesis.leases.HierarchicalShardSyncer.newKCLLease;
@Slf4j
@AllArgsConstructor
public class EmptyLeaseTableSynchronizer implements LeaseSynchronizer {
@Override
public List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds,
HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) {
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
currentLeases.stream().peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards);
final Comparator<Lease> startingSequenceNumberComparator =
new HierarchicalShardSyncer.StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
@Override
public void cleanupGarbageLeases(List<Shard> shards, LeaseRefresher leaseRefresher, List<Lease> trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
// Nothing to do here.
}
@Override
public void cleanupLeasesOfFinishedShards(LeaseRefresher leaseRefresher, List<Lease> currentLeases, List<Lease> trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
// Nothing to do here.
}
/**
* Helper method to create leases. For an empty lease table, we will be creating leases for all shards
* regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon
* reaching SHARD_END.
*/
private List<Lease> getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition,
List<Shard> shards) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
for (Shard shard : shards) {
final String shardId = shard.shardId();
final Lease lease = newKCLLease(shard);
lease.checkpoint(convertToCheckpoint(initialPosition));
log.debug("Need to create a lease for shard with shardId {}", shardId);
shardIdToNewLeaseMap.put(shardId, lease);
}
return new ArrayList(shardIdToNewLeaseMap.values());
}
}

View file

@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
@ -38,6 +39,8 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
@ -99,7 +102,8 @@ public class HierarchicalShardSyncer {
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> latestShards = getShardList(shardDetector);
final List<Shard> latestShards = leaseRefresher.isLeaseTableEmpty() ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getFullShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, scope, latestShards);
}
@ -125,8 +129,11 @@ public class HierarchicalShardSyncer {
getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) :
leaseRefresher.listLeases();
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition,
inconsistentShardIds, multiStreamArgs);
final LeaseSynchronizer leaseSynchronizer = leaseRefresher.isLeaseTableEmpty() ?
new EmptyLeaseTableSynchronizer() :
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases,
initialPosition, inconsistentShardIds, multiStreamArgs);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
@ -140,13 +147,33 @@ public class HierarchicalShardSyncer {
}
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher, multiStreamArgs);
leaseSynchronizer.cleanupGarbageLeases(latestShards, leaseRefresher, trackedLeases, multiStreamArgs);
if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseRefresher, multiStreamArgs);
leaseSynchronizer.cleanupLeasesOfFinishedShards(leaseRefresher, currentLeases, trackedLeases, multiStreamArgs);
}
}
static List<Lease> determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition) {
final Set<String> inconsistentShardIds = new HashSet<>();
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds);
}
static List<Lease> determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,
final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null));
}
static List<Lease> determineNewLeasesToCreate(LeaseSynchronizer leaseSynchronizer, final List<Shard> shards,
final List<Lease> currentLeases, final InitialPositionInStreamExtended initialPosition,
final Set<String> inconsistentShardIds, final MultiStreamArgs multiStreamArgs) {
return leaseSynchronizer.determineNewLeasesToCreate(shards, currentLeases, initialPosition,inconsistentShardIds,
multiStreamArgs);
}
// CHECKSTYLE:ON CyclomaticComplexity
/** Note: This method has package level access solely for testing purposes.
@ -200,75 +227,6 @@ public class HierarchicalShardSyncer {
.flatMap(entry -> shardIdToChildShardIdsMap.get(entry.getKey()).stream()).collect(Collectors.toSet());
}
/**
* Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards.
* @param shardIdsOfClosedShards Id of the shard which is expected to be closed
* @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException
*/
synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException {
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress.";
for (String shardId : shardIdsOfClosedShards) {
final Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
log.info("Shard {} is not present in Kinesis anymore.", shardId);
continue;
}
final String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
+ " is not closed. " + exceptionMessageSuffix);
}
final Set<String> childShardIds = shardIdToChildShardIdsMap.get(shardId);
if (childShardIds == null) {
throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId
+ " has no children." + exceptionMessageSuffix);
}
assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds);
}
}
private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
throws KinesisClientLibIOException {
BigInteger minStartingHashKeyOfChildren = null;
BigInteger maxEndingHashKeyOfChildren = null;
final BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().startingHashKey());
final BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().endingHashKey());
for (String childShardId : childShardIds) {
final Shard childShard = shardIdToShardMap.get(childShardId);
final BigInteger startingHashKey = new BigInteger(childShard.hashKeyRange().startingHashKey());
if (minStartingHashKeyOfChildren == null || startingHashKey.compareTo(minStartingHashKeyOfChildren) < 0) {
minStartingHashKeyOfChildren = startingHashKey;
}
final BigInteger endingHashKey = new BigInteger(childShard.hashKeyRange().endingHashKey());
if (maxEndingHashKeyOfChildren == null || endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0) {
maxEndingHashKeyOfChildren = endingHashKey;
}
}
if (minStartingHashKeyOfChildren == null || maxEndingHashKeyOfChildren == null
|| minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0
|| maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0) {
throw new KinesisClientLibIOException(String.format(
"Incomplete shard list: hash key range of shard %s is not covered by its child shards.",
closedShard.shardId()));
}
}
/**
* Helper method to construct shardId->setOfChildShardIds map.
@ -299,149 +257,44 @@ public class HierarchicalShardSyncer {
return shardIdToChildShardIdsMap;
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final List<Shard> shards = shardDetector.listShards();
if (shards == null) {
throw new KinesisClientLibIOException(
"Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) {
ShardFilter.Builder builder = ShardFilter.builder();
switch (initialPositionInStreamExtended.getInitialPositionInStream()) {
case LATEST:
builder = builder.type(ShardFilterType.AT_LATEST);
break;
case TRIM_HORIZON:
builder = builder.type(ShardFilterType.AT_TRIM_HORIZON);
break;
case AT_TIMESTAMP:
builder = builder.type(ShardFilterType.AT_TIMESTAMP).timestamp(initialPositionInStreamExtended.getTimestamp().toInstant());
break;
}
return shards;
return builder.build();
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
*
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
*
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds,
final MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector,
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException {
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter));
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
log.debug("Retrieved {} shards with ShardFilter - {}.", shards.map(s -> s.size()).orElse(0), shardFilter);
final List<Shard> openShards = getOpenShards(shards);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext, multiStreamArgs);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant
&& !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.checkpoint(convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
final Comparator<Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - " +
"will retry getting the shard list."));
}
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition, final Set<String> inconsistentShardIds) {
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds,
new MultiStreamArgs(false, null));
static List<Shard> getFullShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShards());
log.debug("Retrieved {} shards.", shards.map(s -> s.size()).orElse(0));
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - " +
"will retry getting the shard list."));
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*/
static List<Lease> determineNewLeasesToCreate(final List<Shard> shards, final List<Lease> currentLeases,
final InitialPositionInStreamExtended initialPosition) {
final Set<String> inconsistentShardIds = new HashSet<>();
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
}
/**
* Note: Package level access for testing purposes only.
@ -571,49 +424,6 @@ public class HierarchicalShardSyncer {
return parentShardIds;
}
/**
* Delete leases corresponding to shards that no longer exist in the stream. Current scheme: Delete a lease if:
* <ul>
* <li>The corresponding shard is not present in the list of Kinesis shards</li>
* <li>The parentShardIds listed in the lease are also not present in the list of Kinesis shards.</li>
* </ul>
*
* @param shards
* List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
* @param trackedLeases
* List of
* @param leaseRefresher
* @throws KinesisClientLibIOException
* Thrown if we couldn't get a fresh shard list from Kinesis.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
private static void cleanupGarbageLeases(@NonNull final ShardDetector shardDetector, final List<Shard> shards,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws KinesisClientLibIOException,
DependencyException, InvalidStateException, ProvisionedThroughputException {
final Set<String> kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet());
// Check if there are leases for non-existent shards
final List<Lease> garbageLeases = trackedLeases.stream()
.filter(lease -> isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList());
if (!CollectionUtils.isNullOrEmpty(garbageLeases)) {
log.info("Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", garbageLeases.size());
final Set<String> currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId)
.collect(Collectors.toSet());
for (Lease lease : garbageLeases) {
if (isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) {
log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey());
leaseRefresher.deleteLease(lease);
}
}
}
}
/**
* Note: This method has package level access, solely for testing purposes.
*
@ -652,91 +462,6 @@ public class HierarchicalShardSyncer {
return isCandidateForCleanup;
}
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON.
*
* @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
* @param trackedLeases List of all leases we are tracking.
* @param leaseRefresher Lease refresher (will be used to delete leases)
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher,
final MultiStreamArgs multiStreamArgs) throws DependencyException,
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
final List<Lease> leasesOfClosedShards = currentLeases.stream()
.filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END))
.collect(Collectors.toList());
final Set<String> shardIdsOfClosedShards = leasesOfClosedShards.stream()
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet());
if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
Comparator<? super Lease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMap, multiStreamArgs);
leasesOfClosedShards.sort(startingSequenceNumberComparator);
final Map<String, Lease> trackedLeaseMap = trackedLeases.stream()
.collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity()));
for (Lease leaseOfClosedShard : leasesOfClosedShards) {
final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs);
final Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs);
}
}
}
}
/**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
*
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->Lease map with all leases we are tracking (should not be null)
* @param leaseRefresher
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher, final MultiStreamArgs multiStreamArgs)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
final List<Lease> childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull)
.collect(Collectors.toList());
if (leaseForClosedShard != null && leaseForClosedShard.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)
&& childShardLeases.size() == childShardIds.size()) {
boolean okayToDelete = true;
for (Lease lease : childShardLeases) {
if (lease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
okayToDelete = false;
break;
}
}
if (okayToDelete) {
log.info("Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
leaseRefresher.deleteLease(leaseForClosedShard);
}
}
}
/**
* Helper method to create a new Lease POJO for a shard.
* Note: Package level access only for testing purposes
@ -744,7 +469,7 @@ public class HierarchicalShardSyncer {
* @param shard
* @return
*/
private static Lease newKCLLease(final Shard shard) {
static Lease newKCLLease(final Shard shard) {
Lease newLease = new Lease();
newLease.leaseKey(shard.shardId());
List<String> parentShardIds = new ArrayList<>(2);
@ -760,7 +485,7 @@ public class HierarchicalShardSyncer {
return newLease;
}
private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) {
static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) {
MultiStreamLease newLease = new MultiStreamLease();
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), shard.shardId()));
List<String> parentShardIds = new ArrayList<>(2);
@ -799,7 +524,7 @@ public class HierarchicalShardSyncer {
.peek(shard -> log.debug("Found open shard: {}", shard.shardId())).collect(Collectors.toList());
}
private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) {
static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
@ -817,7 +542,7 @@ public class HierarchicalShardSyncer {
*
*/
@RequiredArgsConstructor
private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<Lease>, Serializable {
static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<Lease>, Serializable {
private static final long serialVersionUID = 1L;
private final Map<String, Shard> shardIdToShardMap;

View file

@ -42,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
@ -149,12 +150,18 @@ public class KinesisShardDetector implements ShardDetector {
@Override
@Synchronized
public List<Shard> listShards() {
return listShardsWithFilter(null);
}
@Override
@Synchronized
public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
final List<Shard> shards = new ArrayList<>();
ListShardsResponse result;
String nextToken = null;
do {
result = listShards(nextToken);
result = listShards(shardFilter, nextToken);
if (result == null) {
/*
@ -172,13 +179,13 @@ public class KinesisShardDetector implements ShardDetector {
return shards;
}
private ListShardsResponse listShards(final String nextToken) {
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder();
ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder().shardFilter(shardFilter);
if (StringUtils.isEmpty(nextToken)) {
request = request.streamName(streamIdentifier.streamName());
} else {

View file

@ -0,0 +1,34 @@
package software.amazon.kinesis.leases;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
public interface LeaseSynchronizer {
BiFunction<Lease, HierarchicalShardSyncer.MultiStreamArgs, String> shardIdFromLeaseDeducer =
(lease, multiStreamArgs) ->
multiStreamArgs.isMultiStreamMode() ?
((MultiStreamLease) lease).shardId() :
lease.leaseKey();
List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds,
HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs);
void cleanupGarbageLeases(List<Shard> shards, LeaseRefresher leaseRefresher, List<Lease> trackedLeases,
HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs)
throws DependencyException, ProvisionedThroughputException, InvalidStateException;
void cleanupLeasesOfFinishedShards(LeaseRefresher leaseRefresher, List<Lease> currentLeases, List<Lease> trackedLeases,
HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs)
throws DependencyException, ProvisionedThroughputException, InvalidStateException;
}

View file

@ -0,0 +1,314 @@
package software.amazon.kinesis.leases;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static software.amazon.kinesis.leases.HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors;
import static software.amazon.kinesis.leases.HierarchicalShardSyncer.constructShardIdToShardMap;
@Slf4j
@AllArgsConstructor
public class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
private final ShardDetector shardDetector;
private final Map<String, Shard> shardIdToShardMap;
private final Map<String, Set<String>> shardIdToChildShardIdsMap;
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
* <p>
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
* <p>
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
* <p>
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
* <p>
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
* <p>
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
@Override
public synchronized List<Lease> determineNewLeasesToCreate(List<Shard> shards, List<Lease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds,
HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
final Set<String> shardIdsOfCurrentLeases = currentLeases.stream()
.peek(lease -> log.debug("Existing lease: {}", lease))
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet());
final List<Shard> openShards = HierarchicalShardSyncer.getOpenShards(shards);
final Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
final String shardId = shard.shardId();
log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
if (shardIdsOfCurrentLeases.contains(shardId)) {
log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
} else if (inconsistentShardIds.contains(shardId)) {
log.info("shardId {} is an inconsistent child. Not creating a lease", shardId);
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
HierarchicalShardSyncer.newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
HierarchicalShardSyncer.newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext, multiStreamArgs);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant
&& !initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.checkpoint(HierarchicalShardSyncer.convertToCheckpoint(initialPosition));
}
log.debug("Set checkpoint of {} to {}", newLease.leaseKey(), newLease.checkpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
final List<Lease> newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
final Comparator<Lease> startingSequenceNumberComparator = new HierarchicalShardSyncer.StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
newLeasesToCreate.sort(startingSequenceNumberComparator);
return newLeasesToCreate;
}
@Override
public synchronized void cleanupGarbageLeases(List<Shard> shards, LeaseRefresher leaseRefresher,
List<Lease> trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final Set<String> kinesisShards = shards.stream().map(Shard::shardId).collect(Collectors.toSet());
// Check if there are leases for non-existent shards
final List<Lease> garbageLeases = trackedLeases.stream()
.filter(lease -> HierarchicalShardSyncer.isCandidateForCleanup(lease, kinesisShards, multiStreamArgs)).collect(Collectors.toList());
if (!CollectionUtils.isNullOrEmpty(garbageLeases)) {
log.info("Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", garbageLeases.size());
final Set<String> currentKinesisShardIds = HierarchicalShardSyncer.getFullShardList(shardDetector).stream().map(Shard::shardId)
.collect(Collectors.toSet());
for (Lease lease : garbageLeases) {
if (HierarchicalShardSyncer.isCandidateForCleanup(lease, currentKinesisShardIds, multiStreamArgs)) {
log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease.leaseKey());
leaseRefresher.deleteLease(lease);
}
}
}
}
@Override
public synchronized void cleanupLeasesOfFinishedShards(LeaseRefresher leaseRefresher, List<Lease> currentLeases,
List<Lease> trackedLeases, HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final List<Lease> leasesOfClosedShards = currentLeases.stream()
.filter(lease -> lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END))
.collect(Collectors.toList());
final Set<String> shardIdsOfClosedShards = leasesOfClosedShards.stream()
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)).collect(Collectors.toSet());
if (!CollectionUtils.isNullOrEmpty(leasesOfClosedShards)) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
Comparator<? super Lease> startingSequenceNumberComparator =
new HierarchicalShardSyncer.StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMap, multiStreamArgs);
leasesOfClosedShards.sort(startingSequenceNumberComparator);
final Map<String, Lease> trackedLeaseMap = trackedLeases.stream()
.collect(Collectors.toMap(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs), Function.identity()));
for (Lease leaseOfClosedShard : leasesOfClosedShards) {
final String closedShardId = shardIdFromLeaseDeducer.apply(leaseOfClosedShard, multiStreamArgs);
final Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if (closedShardId != null && !CollectionUtils.isNullOrEmpty(childShardIds)) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseRefresher, multiStreamArgs);
}
}
}
}
/**
* Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards.
* @param shardIdsOfClosedShards Id of the shard which is expected to be closed
* @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException
*/
synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
final Map<String, Set<String>> shardIdToChildShardIdsMap,
final Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException {
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress.";
for (String shardId : shardIdsOfClosedShards) {
final Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
log.info("Shard {} is not present in Kinesis anymore.", shardId);
continue;
}
final String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
+ " is not closed. " + exceptionMessageSuffix);
}
final Set<String> childShardIds = shardIdToChildShardIdsMap.get(shardId);
if (childShardIds == null) {
throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId
+ " has no children." + exceptionMessageSuffix);
}
assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds);
}
}
private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
throws KinesisClientLibIOException {
BigInteger minStartingHashKeyOfChildren = null;
BigInteger maxEndingHashKeyOfChildren = null;
final BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().startingHashKey());
final BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.hashKeyRange().endingHashKey());
for (String childShardId : childShardIds) {
final Shard childShard = shardIdToShardMap.get(childShardId);
final BigInteger startingHashKey = new BigInteger(childShard.hashKeyRange().startingHashKey());
if (minStartingHashKeyOfChildren == null || startingHashKey.compareTo(minStartingHashKeyOfChildren) < 0) {
minStartingHashKeyOfChildren = startingHashKey;
}
final BigInteger endingHashKey = new BigInteger(childShard.hashKeyRange().endingHashKey());
if (maxEndingHashKeyOfChildren == null || endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0) {
maxEndingHashKeyOfChildren = endingHashKey;
}
}
if (minStartingHashKeyOfChildren == null || maxEndingHashKeyOfChildren == null
|| minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0
|| maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0) {
throw new KinesisClientLibIOException(String.format(
"Incomplete shard list: hash key range of shard %s is not covered by its child shards.",
closedShard.shardId()));
}
}
/**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
*
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->Lease map with all leases we are tracking (should not be null)
* @param leaseRefresher
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher, final HierarchicalShardSyncer.MultiStreamArgs multiStreamArgs)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
final List<Lease> childShardLeases = childShardIds.stream().map(trackedLeases::get).filter(Objects::nonNull)
.collect(Collectors.toList());
if (leaseForClosedShard != null && leaseForClosedShard.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)
&& childShardLeases.size() == childShardIds.size()) {
boolean okayToDelete = true;
for (Lease lease : childShardLeases) {
if (lease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
okayToDelete = false;
break;
}
}
if (okayToDelete) {
log.info("Deleting lease for shard {} as it has been completely processed and processing of child "
+ "shards has begun.", shardIdFromLeaseDeducer.apply(leaseForClosedShard, multiStreamArgs));
leaseRefresher.deleteLease(leaseForClosedShard);
}
}
}
}

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.StreamIdentifier;
import java.util.List;
@ -28,6 +29,8 @@ public interface ShardDetector {
List<Shard> listShards();
List<Shard> listShardsWithFilter(ShardFilter shardFilter);
default StreamIdentifier streamIdentifier() {
throw new UnsupportedOperationException("StreamName not available");
}

View file

@ -64,8 +64,9 @@ public class ShardSyncTask implements ConsumerTask {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}

View file

@ -112,9 +112,10 @@ public class HierarchicalShardSyncerTest {
public void testDetermineNewLeasesToCreateNoShards() {
final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList();
final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(),
equalTo(true));
assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases,
INITIAL_POSITION_LATEST).isEmpty(), equalTo(true));
}
/**
@ -123,10 +124,11 @@ public class HierarchicalShardSyncerTest {
@Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() {
final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList();
final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer();
assertThat(HierarchicalShardSyncer
.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS)
.isEmpty(), equalTo(true));
.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST,
new HashSet<>(), MULTI_STREAM_ARGS).isEmpty(), equalTo(true));
}
/**
@ -141,9 +143,10 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -163,9 +166,10 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
@ -174,59 +178,6 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseKeys, equalTo(expectedLeaseIds));
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/**
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
*/
@ -1069,6 +1020,7 @@ public class HierarchicalShardSyncerTest {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final List<Lease> currentLeases = new ArrayList<>();
final LeaseSynchronizer emptyLeaseTableSynchronizer = new EmptyLeaseTableSynchronizer();
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
@ -1080,7 +1032,7 @@ public class HierarchicalShardSyncerTest {
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
for (InitialPositionInStreamExtended initialPosition : initialPositions) {
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases,
initialPosition);
assertThat(newLeases.size(), equalTo(2));
@ -1092,24 +1044,6 @@ public class HierarchicalShardSyncerTest {
}
}
@Test
public void testDetermineNewLeasesToCreateIgnoreClosedShard() {
final String lastShardId = "shardId-1";
final List<Lease> currentLeases = new ArrayList<>();
final List<Shard> shards = Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null,
ShardObjectHelper.newSequenceNumberRange("303", "404")),
ShardObjectHelper.newShard(lastShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("405", null)));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
assertThat(newLeases.size(), equalTo(1));
assertThat(newLeases.get(0).leaseKey(), equalTo(lastShardId));
}
// /**
// * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position Latest)
// * Shard structure (each level depicts a stream segment):
@ -1125,9 +1059,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphA();
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5"));
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -1161,8 +1101,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_LATEST);
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON);
@ -1194,8 +1141,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1229,8 +1183,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1259,8 +1220,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_TRIM_HORIZON);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@ -1294,8 +1262,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
newLease("shardId-5"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList());
@ -1328,8 +1303,15 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
newLease("shardId-7"));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList());
@ -1355,8 +1337,15 @@ public class HierarchicalShardSyncerTest {
final List<Shard> shards = constructShardListForGraphB();
final List<Lease> currentLeases = new ArrayList<>();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_AT_TIMESTAMP);
final Map<String, Shard> shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = HierarchicalShardSyncer
.constructShardIdToChildShardIdsMap(shardIdToShardMap);
final LeaseSynchronizer nonEmptyLeaseTableSynchronizer =
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer,
shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP);
final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toList());

View file

@ -45,6 +45,7 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSynchronizer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper;

View file

@ -33,7 +33,7 @@
</scm>
<properties>
<awssdk.version>2.10.66</awssdk.version>
<awssdk.version>2.10.65-SNAPSHOT</awssdk.version>
</properties>
<licenses>