Changes to enqueue lease for deletion in potential garbage cleanup scenario.

This commit is contained in:
Ashwin Giridharan 2020-06-24 12:39:18 -07:00
parent 1b86184428
commit a2b6019336
3 changed files with 16 additions and 47 deletions

View file

@ -105,7 +105,8 @@ public class LeaseCleanupManager {
} }
/** /**
* Enqueues a lease for deletion. * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion}
* for checking the duplicate entries.
* @param leasePendingDeletion * @param leasePendingDeletion
*/ */
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {

View file

@ -136,9 +136,19 @@ public class ShutdownTask implements ConsumerTask {
updateLeaseWithChildShards(currentShardLease); updateLeaseWithChildShards(currentShardLease);
} }
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
attemptShardEndCheckpointing(scope, startTime); boolean isSuccess = false;
try {
isSuccess = attemptShardEndCheckpointing(scope, startTime);
} finally {
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (isSuccess || !CollectionUtils.isNullOrEmpty(childShards)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
} }
}
}
} else { } else {
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
} }
@ -169,7 +179,7 @@ public class ShutdownTask implements ConsumerTask {
return new TaskResult(exception); return new TaskResult(exception);
} }
private void attemptShardEndCheckpointing(MetricsScope scope, long startTime) private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException { CustomerApplicationException {
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
@ -179,6 +189,7 @@ public class ShutdownTask implements ConsumerTask {
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
} }
return true;
} }
private void applicationCheckpointAndVerification() { private void applicationCheckpointAndVerification() {

View file

@ -225,49 +225,6 @@ public class LeaseCleanupManagerTest {
verify(leaseRefresher, times(1)).deleteLease(heldLease); verify(leaseRefresher, times(1)).deleteLease(heldLease);
} }
/**
* Tests that if a lease deletion fails, it's re-enqueued for deletion.
*/
@Test
public final void testFailedDeletionsReEnqueued() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class);
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
}
/**
* Tests duplicate leases are not enqueued for deletion.
*/
@Test
public final void testNoDuplicateLeasesEnqueued() {
// Disable lease cleanup so that the queue isn't drained while the test is running.
cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis,
NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis,
completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis);
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
// Enqueue the same lease twice.
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
}
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards, private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
ExtendedSequenceNumber extendedSequenceNumber, ExtendedSequenceNumber extendedSequenceNumber,
int expectedDeletedLeases) throws Exception { int expectedDeletedLeases) throws Exception {