Increased logging verbosity around lease management. Also included additional javadocs for methods (#1040)

Co-authored-by: Ryan Pelaez <rmpelaez@amazon.com>
This commit is contained in:
pelaezryan 2023-02-13 09:28:54 -08:00 committed by GitHub
parent 34f19c5a7b
commit 9fb58a22bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 106 additions and 27 deletions

View file

@ -202,6 +202,10 @@ class PeriodicShardSyncManager {
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
} else {
log.info("Submitted shard sync task for stream {} because of reason {}",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName(),
shardSyncResponse.reasonForDecision());
}
} else {
log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(),
@ -222,6 +226,14 @@ class PeriodicShardSyncManager {
}
}
/**
* Retrieve all the streams, along with their associated leases
* @param streamIdentifiersToFilter
* @return
* @throws DependencyException
* @throws ProvisionedThroughputException
* @throws InvalidStateException
*/
private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(
final Set<StreamIdentifier> streamIdentifiersToFilter)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
@ -242,6 +254,13 @@ class PeriodicShardSyncManager {
}
}
/**
* Given a list of leases for a stream, determine if a shard sync is necessary.
* @param streamIdentifier
* @param leases
* @return
*/
@VisibleForTesting
ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) {
@ -272,12 +291,24 @@ class PeriodicShardSyncManager {
}
}
/**
* Object containing metadata about the state of a shard sync
*/
@Value
@Accessors(fluent = true)
@VisibleForTesting
static class ShardSyncResponse {
/**
* Flag to determine if a shard sync is necessary or not
*/
private final boolean shouldDoShardSync;
private final boolean isHoleDetected;
/**
* Reason behind the state of 'shouldDoShardSync' flag
*/
private final String reasonForDecision;
}

View file

@ -331,8 +331,7 @@ public class Scheduler implements Runnable {
for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) {
try {
log.info("Initialization attempt {}", (i + 1));
log.info("Initializing LeaseCoordinator");
log.info("Initializing LeaseCoordinator attempt {}", (i + 1));
leaseCoordinator.initialize();
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {

View file

@ -148,17 +148,24 @@ public class HierarchicalShardSyncer {
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases,
initialPosition, inconsistentShardIds, multiStreamArgs);
log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size());
log.info("{} - Number of new leases to create: {}", streamIdentifier, newLeasesToCreate.size());
final Set<Lease> createdLeases = new HashSet<>();
for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
leaseRefresher.createLeaseIfNotExists(lease);
if(leaseRefresher.createLeaseIfNotExists(lease)) {
createdLeases.add(lease);
}
success = true;
} finally {
}
finally {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
}
}
log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases);
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
return true;
@ -398,6 +405,7 @@ public class HierarchicalShardSyncer {
isDescendant = true;
// We don't need to add leases of its ancestors,
// because we'd have done it when creating a lease for this shard.
log.debug("{} - Shard {} is a descendant shard of an existing shard. Skipping lease creation", streamIdentifier, shardId);
} else {
final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
@ -474,9 +482,12 @@ public class HierarchicalShardSyncer {
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint());
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.checkpoint(convertToCheckpoint(initialPosition));
final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition);
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint());
lease.checkpoint(newCheckpoint);
}
}
}
@ -512,7 +523,7 @@ public class HierarchicalShardSyncer {
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired)
*
*
* @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
@ -538,6 +549,12 @@ public class HierarchicalShardSyncer {
: newKCLLeaseForChildShard(childShard);
}
/**
* Generate a lease object for the given Child Shard. Checkpoint is set to TRIM_HORIZON
* @param childShard Shard for which a lease should be created
* @return Lease for the shard
* @throws InvalidStateException If the child shard has no parent shards
*/
private static Lease newKCLLeaseForChildShard(final ChildShard childShard) throws InvalidStateException {
Lease newLease = new Lease();
newLease.leaseKey(childShard.shardId());
@ -571,7 +588,7 @@ public class HierarchicalShardSyncer {
/**
* Helper method to create a new Lease POJO for a shard.
* Note: Package level access only for testing purposes
*
*
* @param shard
* @return
*/
@ -611,7 +628,7 @@ public class HierarchicalShardSyncer {
/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
*
* @param shards List of shards
* @return ShardId->Shard map
*/
@ -622,7 +639,7 @@ public class HierarchicalShardSyncer {
/**
* Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes.
*
*
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
@ -633,7 +650,7 @@ public class HierarchicalShardSyncer {
private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
@ -641,7 +658,7 @@ public class HierarchicalShardSyncer {
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}
return checkpoint;
}
@ -688,7 +705,7 @@ public class HierarchicalShardSyncer {
* We assume that lease1 and lease2 are:
* a/ not null,
* b/ shards (if found) have non-null starting sequence numbers
*
*
* {@inheritDoc}
*/
@Override
@ -698,18 +715,18 @@ public class HierarchicalShardSyncer {
final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs);
final Shard shard1 = shardIdToShardMap.get(shardId1);
final Shard shard2 = shardIdToShardMap.get(shardId2);
// If we found shards for the two leases, use comparison of the starting sequence numbers
if (shard1 != null && shard2 != null) {
BigInteger sequenceNumber1 = new BigInteger(shard1.sequenceNumberRange().startingSequenceNumber());
BigInteger sequenceNumber2 = new BigInteger(shard2.sequenceNumberRange().startingSequenceNumber());
result = sequenceNumber1.compareTo(sequenceNumber2);
result = sequenceNumber1.compareTo(sequenceNumber2);
}
if (result == 0) {
result = shardId1.compareTo(shardId2);
}
return result;
}
}

View file

@ -145,10 +145,18 @@ public class LeaseCleanupManager {
return deletionQueue.size();
}
/**
*
* @return true if the 'Completed Lease Stopwatch' has elapsed more time than the 'Completed Lease Cleanup Interval'
*/
private boolean timeToCheckForCompletedShard() {
return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
}
/**
*
* @return true if the 'Garbage Lease Stopwatch' has elapsed more time than the 'Garbage Lease Cleanup Interval'
*/
private boolean timeToCheckForGarbageShard() {
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
}
@ -230,6 +238,15 @@ public class LeaseCleanupManager {
return true;
}
/**
* Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a lease, return false. Otherwise return true
* @param lease
* @param shardInfo
* @return
* @throws DependencyException
* @throws ProvisionedThroughputException
* @throws InvalidStateException
*/
private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
for (String parentShard : lease.parentShardIds()) {
final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));

View file

@ -169,7 +169,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
}else{
} else {
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.build();
@ -429,7 +429,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
@Override
public boolean createLeaseIfNotExists(@NonNull final Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Creating lease {}", lease);
log.debug("Creating lease: {}", lease);
PutItemRequest request = PutItemRequest.builder().tableName(table).item(serializer.toDynamoRecord(lease))
.expected(serializer.getDynamoNonexistantExpectation()).build();
@ -452,6 +452,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("create", lease.leaseKey(), e);
}
log.info("Created lease: {}",lease);
return true;
}
@ -476,7 +477,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return null;
} else {
final Lease lease = serializer.fromDynamoRecord(dynamoRecord);
log.debug("Got lease {}", lease);
log.debug("Retrieved lease: {}", lease);
return lease;
}
} catch (ExecutionException e) {
@ -535,6 +536,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
lease.leaseCounter(lease.leaseCounter() + 1);
log.debug("Renewed lease with key {}", lease.leaseKey());
return true;
}
@ -582,6 +584,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
lease.ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint() + 1);
}
log.info("Transferred lease {} ownership from {} to {}", lease.leaseKey(), oldOwner, owner);
return true;
}
@ -620,6 +624,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
lease.leaseOwner(null);
lease.leaseCounter(lease.leaseCounter() + 1);
log.info("Evicted lease with leaseKey {}", lease.leaseKey());
return true;
}
@ -648,6 +654,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("deleteAll", lease.leaseKey(), e);
}
log.debug("Deleted lease {} from table {}", lease.leaseKey(), table);
}
}
@ -675,6 +682,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("delete", lease.leaseKey(), e);
}
log.info("Deleted lease with leaseKey {}", lease.leaseKey());
}
/**
@ -683,7 +692,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
@Override
public boolean updateLease(@NonNull final Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Updating lease {}", lease);
log.debug("Updating lease: {}", lease);
final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ConditionalCheckFailedException.class, t -> t);
@ -711,6 +720,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
lease.leaseCounter(lease.leaseCounter() + 1);
log.info("Updated lease {}.", lease.leaseKey());
return true;
}
@ -738,6 +748,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("update", lease.leaseKey(), e);
}
log.info("Updated lease without expectation {}.", lease);
}
/**

View file

@ -242,7 +242,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
/**
* Internal method to return a lease with a specific lease key only if we currently hold it.
*
*
* @param leaseKey key of lease to return
* @param now current timestamp for old-ness checking
* @return non-authoritative copy of the held lease, or null if we don't currently hold it
@ -309,6 +309,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
log.info("Updating lease from {} to {}", authoritativeLease, lease);
synchronized (authoritativeLease) {
authoritativeLease.update(lease);
boolean updatedLease = leaseRefresher.updateLease(authoritativeLease);
@ -325,7 +326,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
/*
* Remove only if the value currently in the map is the same as the authoritative lease. We're
* guarding against a pause after the concurrency token check above. It plays out like so:
*
*
* 1) Concurrency token check passes
* 2) Pause. Lose lease, re-acquire lease. This requires at least one lease counter update.
* 3) Unpause. leaseRefresher.updateLease fails conditional write due to counter updates, returns
@ -333,7 +334,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
* 4) ownedLeases.remove(key, value) doesn't do anything because authoritativeLease does not
* .equals() the re-acquired version in the map on the basis of lease counter. This is what we want.
* If we just used ownedLease.remove(key), we would have pro-actively removed a lease incorrectly.
*
*
* Note that there is a subtlety here - Lease.equals() deliberately does not check the concurrency
* token, but it does check the lease counter, so this scheme works.
*/

View file

@ -259,6 +259,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
leasesToTake = leasesToTake.stream().map(lease -> {
if (lease.isMarkedForLeaseSteal()) {
try {
log.debug("Updating stale lease {}.", lease.leaseKey());
return leaseRefresher.getLease(lease.leaseKey());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
log.warn("Failed to fetch latest state of the lease {} that needs to be stolen, "
@ -408,7 +409,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
target = 1;
} else {
/*
* numWorkers must be < numLeases.
* if we have made it here, it means there are more leases than workers
*
* Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases)
*/

View file

@ -267,13 +267,14 @@ public class ShutdownTask implements ConsumerTask {
}
}
}
// Attempt create leases for child shards.
for(ChildShard childShard : childShards) {
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
log.info("Shard {}: Created child shard lease: {}", shardInfo.shardId(), leaseToCreate.leaseKey());
log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate);
}
}
}