Update the Worker shutdown logic to make sure that the LeaseCleanup Manager also terminates all the threads that it has started (#817)
Co-authored-by: kmz <kmz@users.noreply.github.com>
This commit is contained in:
parent
f65b3fd43b
commit
a25835678a
3 changed files with 32 additions and 0 deletions
|
|
@ -828,6 +828,10 @@ public class Scheduler implements Runnable {
|
||||||
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
// Lost leases will force Worker to begin shutdown process for all shard consumers in
|
||||||
// Worker.run().
|
// Worker.run().
|
||||||
leaseCoordinator.stop();
|
leaseCoordinator.stop();
|
||||||
|
|
||||||
|
// Stop the lease cleanup manager
|
||||||
|
leaseCleanupManager.shutdown();
|
||||||
|
|
||||||
leaderElectedPeriodicShardSyncManager.stop();
|
leaderElectedPeriodicShardSyncManager.stop();
|
||||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,22 @@ public class LeaseCleanupManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the lease cleanup thread, which is scheduled periodically as specified by
|
||||||
|
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
if (isRunning) {
|
||||||
|
log.info("Stopping the lease cleanup thread.");
|
||||||
|
completedLeaseStopwatch.stop();
|
||||||
|
garbageLeaseStopwatch.stop();
|
||||||
|
deletionThreadPool.shutdown();
|
||||||
|
isRunning = false;
|
||||||
|
} else {
|
||||||
|
log.info("Lease cleanup thread already stopped.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion}
|
* Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion}
|
||||||
* for checking the duplicate entries.
|
* for checking the duplicate entries.
|
||||||
|
|
|
||||||
|
|
@ -94,6 +94,18 @@ public class LeaseCleanupManagerTest {
|
||||||
leaseCleanupManager.start();
|
leaseCleanupManager.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests subsequent calls to shutdown {@link LeaseCleanupManager}.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public final void testSubsequentShutdowns() {
|
||||||
|
leaseCleanupManager.start();
|
||||||
|
Assert.assertTrue(leaseCleanupManager.isRunning());
|
||||||
|
leaseCleanupManager.shutdown();
|
||||||
|
Assert.assertFalse(leaseCleanupManager.isRunning());
|
||||||
|
leaseCleanupManager.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
||||||
* shard case.
|
* shard case.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue