KinesisShardSyncer only create leases for one level of leases (#49)

Co-authored-by: Joshua Kim <kimjos@amazon.com>
This commit is contained in:
Joshua Kim 2020-07-06 14:50:21 -04:00 committed by GitHub
parent 3a88a60a4e
commit 0760688375
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 1376 additions and 323 deletions

View file

@ -31,6 +31,7 @@ import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType; import com.amazonaws.services.kinesis.model.ShardFilterType;
import com.amazonaws.util.CollectionUtils; import com.amazonaws.util.CollectionUtils;
import lombok.NoArgsConstructor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -457,7 +458,7 @@ class KinesisShardSyncer implements ShardSyncer {
/** /**
* Note: Package level access for testing purposes only. * Note: Package level access for testing purposes only.
* Check if this shard is a descendant of a shard that is (or will be) processed. * Check if this shard is a descendant of a shard that is (or will be) processed.
* Create leases for the ancestors of this shard as required. * Create leases for the first ancestor of this shard that needs to be processed, as required.
* See javadoc of determineNewLeasesToCreate() for rules and example. * See javadoc of determineNewLeasesToCreate() for rules and example.
* *
* @param shardId The shardId to check. * @param shardId The shardId to check.
@ -473,9 +474,10 @@ class KinesisShardSyncer implements ShardSyncer {
static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStreamExtended initialPosition, Set<String> shardIdsOfCurrentLeases, InitialPositionInStreamExtended initialPosition, Set<String> shardIdsOfCurrentLeases,
Map<String, Shard> shardIdToShardMapOfAllKinesisShards, Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards, Map<String, Boolean> memoizationContext) { Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards, MemoizationContext memoizationContext) {
final Boolean previousValue = memoizationContext.isDescendant(shardId);
Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) { if (previousValue != null) {
return previousValue; return previousValue;
} }
@ -495,10 +497,13 @@ class KinesisShardSyncer implements ShardSyncer {
shard = shardIdToShardMapOfAllKinesisShards.get(shardId); shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards); parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
for (String parentShardId : parentShardIds) { for (String parentShardId : parentShardIds) {
// Check if the parent is a descendant, and include its ancestors. // Check if the parent is a descendant, and include its ancestors. Or, if the parent is NOT a
if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition, // descendant but we should create a lease for it anyway (e.g. to include in processing from
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, // TRIM_HORIZON or AT_TIMESTAMP). If either is true, then we mark the current shard as a descendant.
memoizationContext)) { final boolean isParentDescendant = checkIfDescendantAndAddNewLeasesForAncestors(parentShardId,
initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
shardIdToLeaseMapOfNewShards, memoizationContext);
if (isParentDescendant || memoizationContext.shouldCreateLease(parentShardId)) {
isDescendant = true; isDescendant = true;
descendantParentShardIds.add(parentShardId); descendantParentShardIds.add(parentShardId);
LOG.debug("Parent shard " + parentShardId + " is a descendant."); LOG.debug("Parent shard " + parentShardId + " is a descendant.");
@ -511,37 +516,76 @@ class KinesisShardSyncer implements ShardSyncer {
if (isDescendant) { if (isDescendant) {
for (String parentShardId : parentShardIds) { for (String parentShardId : parentShardIds) {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) { if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
LOG.debug("Need to create a lease for shardId " + parentShardId);
KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
// If the lease for the parent shard does not already exist, there are two cases in which we
// would want to create it:
// - If we have already marked the parentShardId for lease creation in a prior recursive
// call. This could happen if we are trying to process from TRIM_HORIZON or AT_TIMESTAMP.
// - If the parent shard is not a descendant but the current shard is a descendant, then
// the parent shard is the oldest shard in the shard hierarchy that does not have an
// ancestor in the lease table (the adjacent parent is necessarily a descendant, and
// therefore covered in the lease table). So we should create a lease for the parent.
if (lease == null) { if (lease == null) {
lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); if (memoizationContext.shouldCreateLease(parentShardId) ||
shardIdToLeaseMapOfNewShards.put(parentShardId, lease); !descendantParentShardIds.contains(parentShardId)) {
LOG.debug("Need to create a lease for shardId " + parentShardId);
lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
} }
if (descendantParentShardIds.contains(parentShardId) && !initialPosition /**
.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will
} else { * add a lease just like we do for TRIM_HORIZON. However we will only return back records
lease.setCheckpoint(convertToCheckpoint(initialPosition)); * with server-side timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: (4, 5, 7)
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards 0 and 1 (with
* checkpoint set AT_TIMESTAMP), even though these ancestor shards have an epoch less than
* 206. However as we begin processing the ancestor shards, their checkpoints would be
* updated to SHARD_END and their leases would then be deleted since they won't have records
* with server-side timestamp at/after 206. And after that we will begin processing the
* descendant shards with epoch at/after 206 and we will return the records that meet the
* timestamp requirement for these shards.
*/
if (lease != null) {
if (descendantParentShardIds.contains(parentShardId) && !initialPosition
.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
}
} }
} }
} }
} else { } else {
// This shard should be included, if the customer wants to process all records in the stream or // This shard is not a descendant, but should still be included if the customer wants to process all
// if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do // records in the stream or if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a
// for TRIM_HORIZON. However we will only return back records with server-side timestamp at or // lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
// after the specified initial position timestamp. // timestamp at or after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream() || initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) { .equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true; memoizationContext.setShouldCreateLease(shardId, true);
} }
} }
} }
} }
memoizationContext.put(shardId, isDescendant); memoizationContext.setIsDescendant(shardId, isDescendant);
return isDescendant; return isDescendant;
} }
// CHECKSTYLE:ON CyclomaticComplexity // CHECKSTYLE:ON CyclomaticComplexity
@ -834,4 +878,28 @@ class KinesisShardSyncer implements ShardSyncer {
} }
/**
* Helper class to pass around state between recursive traversals of shard hierarchy.
*/
@NoArgsConstructor
static class MemoizationContext {
private Map<String, Boolean> isDescendantMap = new HashMap<>();
private Map<String, Boolean> shouldCreateLeaseMap = new HashMap<>();
Boolean isDescendant(String shardId) {
return isDescendantMap.get(shardId);
}
void setIsDescendant(String shardId, Boolean isDescendant) {
isDescendantMap.put(shardId, isDescendant);
}
Boolean shouldCreateLease(String shardId) {
return shouldCreateLeaseMap.computeIfAbsent(shardId, x -> Boolean.FALSE);
}
void setShouldCreateLease(String shardId, Boolean shouldCreateLease) {
shouldCreateLeaseMap.put(shardId, shouldCreateLease);
}
}
} }

View file

@ -47,8 +47,10 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
* Note: Package level access only for testing purposes. * Note: Package level access only for testing purposes.
* *
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease, * For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): * determine if it is a descendant of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. * If so, create a lease for the first ancestor that needs to be processed (if needed). We will create leases
* for no more than one level in the ancestry tree. Once we find the first ancestor that needs to be processed,
* we will avoid creating leases for further descendants of that ancestor.
* If not, set checkpoint of the shard to the initial position specified by the client. * If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules: * To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
@ -67,10 +69,17 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
* 0 1 2 3 4 5 - shards till epoch 102 * 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | | * \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205 * 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \ * \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5) *
* New leases to create: (2, 6, 7, 8, 9, 10) * Current leases: (4, 5, 7)
*
* If initial position is LATEST:
* - New leases to create: (6)
* If initial position is TRIM_HORIZON:
* - New leases to create: (0, 1)
* If initial position is AT_TIMESTAMP(epoch=200):
* - New leases to create: (0, 1)
* *
* The leases returned are sorted by the starting sequence number - following the same order * The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
@ -104,7 +113,8 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
} }
List<Shard> openShards = KinesisShardSyncer.getOpenShards(shards); List<Shard> openShards = KinesisShardSyncer.getOpenShards(shards);
Map<String, Boolean> memoizationContext = new HashMap<>(); final KinesisShardSyncer.MemoizationContext memoizationContext = new KinesisShardSyncer.MemoizationContext();
// Iterate over the open shards and find those that don't have any lease entries. // Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) { for (Shard shard : openShards) {
@ -115,43 +125,30 @@ class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer {
} else if (inconsistentShardIds.contains(shardId)) { } else if (inconsistentShardIds.contains(shardId)) {
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else { } else {
LOG.debug("Need to create a lease for shardId " + shardId); LOG.debug("Beginning traversal of ancestry tree for shardId " + shardId);
KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard);
// A shard is a descendant if at least one if its ancestors exists in the lease table.
// We will create leases for only one level in the ancestry tree. Once we find the first ancestor
// that needs to be processed in order to complete the hash range, we will not create leases for
// further descendants of that ancestor.
boolean isDescendant = KinesisShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, boolean isDescendant = KinesisShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId,
initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
shardIdToNewLeaseMap, memoizationContext); shardIdToNewLeaseMap, memoizationContext);
/** // If shard is a descendant, the leases for its ancestors were already created above. Open shards
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the // that are NOT descendants will not have leases yet, so we create them here. We will not create
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a // leases for open shards that ARE descendants yet - leases for these shards will be created upon
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side // SHARD_END of their parents.
* timestamp at or after the specified initial position timestamp. if (!isDescendant) {
* LOG.debug("ShardId " + shardId + " has no ancestors. Creating a lease.");
* Shard structure (each level depicts a stream segment): final KinesisClientLease newLease = KinesisShardSyncer.newKCLLease(shard);
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.setCheckpoint(KinesisShardSyncer.convertToCheckpoint(initialPosition)); newLease.setCheckpoint(KinesisShardSyncer.convertToCheckpoint(initialPosition));
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
} else {
LOG.debug("ShardId " + shardId + " is a descendant whose ancestors should already have leases. " +
"Not creating a lease.");
} }
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
} }
} }

View file

@ -225,7 +225,7 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
@Override @Override
public boolean isLeaseTableEmpty() throws DependencyException, public boolean isLeaseTableEmpty() throws DependencyException,
InvalidStateException, ProvisionedThroughputException { InvalidStateException, ProvisionedThroughputException {
return false; return leaseManager.listLeases().isEmpty();
} }
} }