diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index 49a0ca36..f6ff8ee0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -176,6 +176,7 @@ public class LeaseCleanupManager { boolean alreadyCheckedForGarbageCollection = false; boolean wereChildShardsPresent = false; boolean wasResourceNotFound = false; + String cleanupFailureReason = ""; try { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { @@ -184,53 +185,55 @@ public class LeaseCleanupManager { Set childShardKeys = leaseFromDDB.getChildShardIds(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { try { + // throws ResourceNotFoundException childShardKeys = getChildShardsFromService(shardInfo); - if (CollectionUtils.isNullOrEmpty(childShardKeys)) { LOG.error("No child shards returned from service for shard " + shardInfo.getShardId()); // If no children shard is found in DDB and from service, then do not delete the lease throw new InvalidStateException("No child shards found for this supposedly " + "closed shard in both local DDB and in service " + shardInfo.getShardId()); - } else { wereChildShardsPresent = true; updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); } - } catch (ResourceNotFoundException e) { - throw e; } finally { + // We rely on resource presence in service for garbage collection. Since we already + // made a call to getChildShardsFromService we would be coming to know if the resource + // is present of not. In latter case, we would throw ResourceNotFoundException, which is + // handled in catch block. alreadyCheckedForGarbageCollection = true; } } else { wereChildShardsPresent = true; } try { - cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys); + final CompletedShardResult completedShardResult = cleanupLeaseForCompletedShard(lease, childShardKeys); + cleanedUpCompletedLease = completedShardResult.cleanedUp(); + cleanupFailureReason = completedShardResult.failureMsg(); } catch (Exception e) { // Suppressing the exception here, so that we can attempt for garbage cleanup. - LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId()); + LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " due to " + e.getMessage()); } } else { LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId()); cleanedUpCompletedLease = true; } } - - if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { - try { - wereChildShardsPresent = !CollectionUtils + if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { + // throws ResourceNotFoundException + wereChildShardsPresent = !CollectionUtils .isNullOrEmpty(getChildShardsFromService(shardInfo)); - } catch (ResourceNotFoundException e) { - throw e; - } } } catch (ResourceNotFoundException e) { wasResourceNotFound = true; cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); + cleanupFailureReason = cleanedUpGarbageLease ? "" : "DDB Lease Deletion Failed"; + } catch (Exception e) { + LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " : " + e.getMessage()); + cleanupFailureReason = e.getMessage(); } - return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent, - wasResourceNotFound); + wasResourceNotFound, cleanupFailureReason); } private Set getChildShardsFromService(ShardInfo shardInfo) { @@ -238,12 +241,16 @@ public class LeaseCleanupManager { return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet()); } - // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the // stream (known explicitly from ResourceNotFound being thrown when processing this shard), private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException { LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream."); - leaseManager.deleteLease(lease); + try { + leaseManager.deleteLease(lease); + } catch (Exception e) { + LOG.warn("Lease deletion failed for " + lease.getLeaseKey() + " due to " + e.getMessage()); + return false; + } return true; } @@ -263,8 +270,9 @@ public class LeaseCleanupManager { // We should only be deleting the current shard's lease if // 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP. // 2. Its parent shard lease(s) have already been deleted. - private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set childShardLeaseKeys) + private CompletedShardResult cleanupLeaseForCompletedShard(KinesisClientLease lease, Set childShardLeaseKeys) throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { + final Set processedChildShardLeaseKeys = new HashSet<>(); for (String childShardLeaseKey : childShardLeaseKeys) { @@ -280,14 +288,17 @@ public class LeaseCleanupManager { } } - if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) { - return false; + boolean parentShardsDeleted = allParentShardLeasesDeleted(lease); + boolean childrenStartedProcessing = Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys); + + if (!parentShardsDeleted || !childrenStartedProcessing) { + return new CompletedShardResult(false, !parentShardsDeleted ? "Parent shard(s) not deleted yet" : "Child shard(s) yet to begin processing"); } LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun."); leaseManager.deleteLease(lease); - return true; + return new CompletedShardResult(true, ""); } private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set childShardKeys) @@ -363,9 +374,17 @@ public class LeaseCleanupManager { boolean cleanedUpGarbageLease; boolean wereChildShardsPresent; boolean wasResourceNotFound; + String cleanupFailureReason; public boolean leaseCleanedUp() { return cleanedUpCompletedLease | cleanedUpGarbageLease; } } + + @Value + @Accessors(fluent = true) + private static class CompletedShardResult { + boolean cleanedUp; + String failureMsg; + } }