Making LeaseCleanupManager more verbose about lease cleanup failures
This commit is contained in:
parent
9cb5020022
commit
76234d172c
1 changed files with 40 additions and 21 deletions
|
|
@ -176,6 +176,7 @@ public class LeaseCleanupManager {
|
|||
boolean alreadyCheckedForGarbageCollection = false;
|
||||
boolean wereChildShardsPresent = false;
|
||||
boolean wasResourceNotFound = false;
|
||||
String cleanupFailureReason = "";
|
||||
|
||||
try {
|
||||
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
|
||||
|
|
@ -184,53 +185,55 @@ public class LeaseCleanupManager {
|
|||
Set<String> childShardKeys = leaseFromDDB.getChildShardIds();
|
||||
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
||||
try {
|
||||
// throws ResourceNotFoundException
|
||||
childShardKeys = getChildShardsFromService(shardInfo);
|
||||
|
||||
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
||||
LOG.error("No child shards returned from service for shard " + shardInfo.getShardId());
|
||||
// If no children shard is found in DDB and from service, then do not delete the lease
|
||||
throw new InvalidStateException("No child shards found for this supposedly " +
|
||||
"closed shard in both local DDB and in service " + shardInfo.getShardId());
|
||||
|
||||
} else {
|
||||
wereChildShardsPresent = true;
|
||||
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
|
||||
}
|
||||
} catch (ResourceNotFoundException e) {
|
||||
throw e;
|
||||
} finally {
|
||||
// We rely on resource presence in service for garbage collection. Since we already
|
||||
// made a call to getChildShardsFromService we would be coming to know if the resource
|
||||
// is present of not. In latter case, we would throw ResourceNotFoundException, which is
|
||||
// handled in catch block.
|
||||
alreadyCheckedForGarbageCollection = true;
|
||||
}
|
||||
} else {
|
||||
wereChildShardsPresent = true;
|
||||
}
|
||||
try {
|
||||
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys);
|
||||
final CompletedShardResult completedShardResult = cleanupLeaseForCompletedShard(lease, childShardKeys);
|
||||
cleanedUpCompletedLease = completedShardResult.cleanedUp();
|
||||
cleanupFailureReason = completedShardResult.failureMsg();
|
||||
} catch (Exception e) {
|
||||
// Suppressing the exception here, so that we can attempt for garbage cleanup.
|
||||
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId());
|
||||
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " due to " + e.getMessage());
|
||||
}
|
||||
} else {
|
||||
LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId());
|
||||
cleanedUpCompletedLease = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
|
||||
try {
|
||||
wereChildShardsPresent = !CollectionUtils
|
||||
if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
|
||||
// throws ResourceNotFoundException
|
||||
wereChildShardsPresent = !CollectionUtils
|
||||
.isNullOrEmpty(getChildShardsFromService(shardInfo));
|
||||
} catch (ResourceNotFoundException e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} catch (ResourceNotFoundException e) {
|
||||
wasResourceNotFound = true;
|
||||
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
|
||||
cleanupFailureReason = cleanedUpGarbageLease ? "" : "DDB Lease Deletion Failed";
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " : " + e.getMessage());
|
||||
cleanupFailureReason = e.getMessage();
|
||||
}
|
||||
|
||||
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
|
||||
wasResourceNotFound);
|
||||
wasResourceNotFound, cleanupFailureReason);
|
||||
}
|
||||
|
||||
private Set<String> getChildShardsFromService(ShardInfo shardInfo) {
|
||||
|
|
@ -238,12 +241,16 @@ public class LeaseCleanupManager {
|
|||
return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
|
||||
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
|
||||
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
|
||||
private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream.");
|
||||
leaseManager.deleteLease(lease);
|
||||
try {
|
||||
leaseManager.deleteLease(lease);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Lease deletion failed for " + lease.getLeaseKey() + " due to " + e.getMessage());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -263,8 +270,9 @@ public class LeaseCleanupManager {
|
|||
// We should only be deleting the current shard's lease if
|
||||
// 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
|
||||
// 2. Its parent shard lease(s) have already been deleted.
|
||||
private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys)
|
||||
private CompletedShardResult cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
|
||||
|
||||
final Set<String> processedChildShardLeaseKeys = new HashSet<>();
|
||||
|
||||
for (String childShardLeaseKey : childShardLeaseKeys) {
|
||||
|
|
@ -280,14 +288,17 @@ public class LeaseCleanupManager {
|
|||
}
|
||||
}
|
||||
|
||||
if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) {
|
||||
return false;
|
||||
boolean parentShardsDeleted = allParentShardLeasesDeleted(lease);
|
||||
boolean childrenStartedProcessing = Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys);
|
||||
|
||||
if (!parentShardsDeleted || !childrenStartedProcessing) {
|
||||
return new CompletedShardResult(false, !parentShardsDeleted ? "Parent shard(s) not deleted yet" : "Child shard(s) yet to begin processing");
|
||||
}
|
||||
|
||||
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun.");
|
||||
leaseManager.deleteLease(lease);
|
||||
|
||||
return true;
|
||||
return new CompletedShardResult(true, "");
|
||||
}
|
||||
|
||||
private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys)
|
||||
|
|
@ -363,9 +374,17 @@ public class LeaseCleanupManager {
|
|||
boolean cleanedUpGarbageLease;
|
||||
boolean wereChildShardsPresent;
|
||||
boolean wasResourceNotFound;
|
||||
String cleanupFailureReason;
|
||||
|
||||
public boolean leaseCleanedUp() {
|
||||
return cleanedUpCompletedLease | cleanedUpGarbageLease;
|
||||
}
|
||||
}
|
||||
|
||||
@Value
|
||||
@Accessors(fluent = true)
|
||||
private static class CompletedShardResult {
|
||||
boolean cleanedUp;
|
||||
String failureMsg;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue