Merge pull request #65 from ashwing/ltr_1_lease_cleanup_logging
Ltr 1 lease cleanup logging
This commit is contained in:
commit
842c755511
4 changed files with 21 additions and 53 deletions
|
|
@ -105,7 +105,8 @@ public class LeaseCleanupManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enqueues a lease for deletion.
|
* Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion}
|
||||||
|
* for checking the duplicate entries.
|
||||||
* @param leasePendingDeletion
|
* @param leasePendingDeletion
|
||||||
*/
|
*/
|
||||||
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
|
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
|
||||||
|
|
@ -135,7 +136,7 @@ public class LeaseCleanupManager {
|
||||||
* Returns how many leases are currently waiting in the queue pending deletion.
|
* Returns how many leases are currently waiting in the queue pending deletion.
|
||||||
* @return number of leases pending deletion.
|
* @return number of leases pending deletion.
|
||||||
*/
|
*/
|
||||||
public int leasesPendingDeletion() {
|
private int leasesPendingDeletion() {
|
||||||
return deletionQueue.size();
|
return deletionQueue.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -310,6 +311,7 @@ public class LeaseCleanupManager {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void cleanupLeases() {
|
void cleanupLeases() {
|
||||||
|
log.info("Number of pending leases to clean before the scan : {}", leasesPendingDeletion());
|
||||||
if (deletionQueue.isEmpty()) {
|
if (deletionQueue.isEmpty()) {
|
||||||
log.debug("No leases pending deletion.");
|
log.debug("No leases pending deletion.");
|
||||||
} else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) {
|
} else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) {
|
||||||
|
|
@ -340,24 +342,22 @@ public class LeaseCleanupManager {
|
||||||
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
|
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
|
||||||
"scheduled execution.", leaseKey, streamIdentifier, e);
|
"scheduled execution.", leaseKey, streamIdentifier, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!deletionSucceeded) {
|
if (!deletionSucceeded) {
|
||||||
log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier);
|
log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier);
|
||||||
failedDeletions.add(leasePendingDeletion);
|
failedDeletions.add(leasePendingDeletion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (completedLeaseCleanedUp) {
|
if (completedLeaseCleanedUp) {
|
||||||
log.debug("At least one completed lease was cleaned up - restarting interval");
|
log.debug("At least one completed lease was cleaned up - restarting interval");
|
||||||
completedLeaseStopwatch.reset().start();
|
completedLeaseStopwatch.reset().start();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (garbageLeaseCleanedUp) {
|
if (garbageLeaseCleanedUp) {
|
||||||
log.debug("At least one garbage lease was cleaned up - restarting interval");
|
log.debug("At least one garbage lease was cleaned up - restarting interval");
|
||||||
garbageLeaseStopwatch.reset().start();
|
garbageLeaseStopwatch.reset().start();
|
||||||
}
|
}
|
||||||
|
|
||||||
deletionQueue.addAll(failedDeletions);
|
deletionQueue.addAll(failedDeletions);
|
||||||
|
|
||||||
|
log.info("Number of pending leases to clean after the scan : {}", leasesPendingDeletion());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -136,8 +136,18 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
updateLeaseWithChildShards(currentShardLease);
|
updateLeaseWithChildShards(currentShardLease);
|
||||||
}
|
}
|
||||||
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
||||||
attemptShardEndCheckpointing(scope, startTime);
|
boolean isSuccess = false;
|
||||||
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
try {
|
||||||
|
isSuccess = attemptShardEndCheckpointing(scope, startTime);
|
||||||
|
} finally {
|
||||||
|
// Check if either the shard end ddb persist is successful or
|
||||||
|
// if childshards is empty. When child shards is empty then either it is due to
|
||||||
|
// completed shard being reprocessed or we got RNF from service.
|
||||||
|
// For these cases enqueue the lease for deletion.
|
||||||
|
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
||||||
|
|
@ -169,7 +179,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
return new TaskResult(exception);
|
return new TaskResult(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void attemptShardEndCheckpointing(MetricsScope scope, long startTime)
|
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
|
||||||
CustomerApplicationException {
|
CustomerApplicationException {
|
||||||
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
|
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
|
||||||
|
|
@ -179,6 +189,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
||||||
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
|
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applicationCheckpointAndVerification() {
|
private void applicationCheckpointAndVerification() {
|
||||||
|
|
|
||||||
|
|
@ -225,49 +225,6 @@ public class LeaseCleanupManagerTest {
|
||||||
verify(leaseRefresher, times(1)).deleteLease(heldLease);
|
verify(leaseRefresher, times(1)).deleteLease(heldLease);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests that if a lease deletion fails, it's re-enqueued for deletion.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public final void testFailedDeletionsReEnqueued() throws Exception {
|
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
|
||||||
ExtendedSequenceNumber.LATEST);
|
|
||||||
|
|
||||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
|
||||||
|
|
||||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
|
||||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
|
|
||||||
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class);
|
|
||||||
|
|
||||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
|
||||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
|
||||||
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests duplicate leases are not enqueued for deletion.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public final void testNoDuplicateLeasesEnqueued() {
|
|
||||||
// Disable lease cleanup so that the queue isn't drained while the test is running.
|
|
||||||
cleanupLeasesOfCompletedShards = false;
|
|
||||||
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis,
|
|
||||||
NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis,
|
|
||||||
completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis);
|
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
|
||||||
ExtendedSequenceNumber.LATEST);
|
|
||||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
|
||||||
|
|
||||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
|
||||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
|
|
||||||
|
|
||||||
// Enqueue the same lease twice.
|
|
||||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
|
||||||
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
|
|
||||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
|
||||||
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
|
|
||||||
}
|
|
||||||
|
|
||||||
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
||||||
ExtendedSequenceNumber extendedSequenceNumber,
|
ExtendedSequenceNumber extendedSequenceNumber,
|
||||||
int expectedDeletedLeases) throws Exception {
|
int expectedDeletedLeases) throws Exception {
|
||||||
|
|
|
||||||
|
|
@ -153,7 +153,7 @@ public class ShutdownTaskTest {
|
||||||
|
|
||||||
final TaskResult result = task.call();
|
final TaskResult result = task.call();
|
||||||
assertNotNull(result.getException());
|
assertNotNull(result.getException());
|
||||||
assertTrue(result.getException() instanceof KinesisClientLibIOException);
|
assertTrue(result.getException() instanceof IllegalStateException);
|
||||||
verify(recordsPublisher, never()).shutdown();
|
verify(recordsPublisher, never()).shutdown();
|
||||||
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue