From a25835678a0ca0e43caf03b0b326afeeb7e9e658 Mon Sep 17 00:00:00 2001 From: kmz <6367575+kmz@users.noreply.github.com> Date: Thu, 1 Jul 2021 09:32:16 +0900 Subject: [PATCH] Update the Worker shutdown logic to make sure that the LeaseCleanup Manager also terminates all the threads that it has started (#817) Co-authored-by: kmz --- .../amazon/kinesis/coordinator/Scheduler.java | 4 ++++ .../kinesis/leases/LeaseCleanupManager.java | 16 ++++++++++++++++ .../kinesis/leases/LeaseCleanupManagerTest.java | 12 ++++++++++++ 3 files changed, 32 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index a33aac3d..e8138e0b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -828,6 +828,10 @@ public class Scheduler implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); + + // Stop the lease cleanup manager + leaseCleanupManager.shutdown(); + leaderElectedPeriodicShardSyncManager.stop(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 6e3104ae..a5928c2a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -93,6 +93,22 @@ public class LeaseCleanupManager { } } + /** + * Stops the lease cleanup thread, which is scheduled periodically as specified by + * {@link LeaseCleanupManager#leaseCleanupIntervalMillis} + */ + public void shutdown() { + if (isRunning) { + log.info("Stopping the lease cleanup thread."); + completedLeaseStopwatch.stop(); + garbageLeaseStopwatch.stop(); + deletionThreadPool.shutdown(); + isRunning = false; + } else { + log.info("Lease cleanup thread already stopped."); + } + } + /** * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion} * for checking the duplicate entries. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index 02c71b03..2e691844 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -94,6 +94,18 @@ public class LeaseCleanupManagerTest { leaseCleanupManager.start(); } + /** + * Tests subsequent calls to shutdown {@link LeaseCleanupManager}. + */ + @Test + public final void testSubsequentShutdowns() { + leaseCleanupManager.start(); + Assert.assertTrue(leaseCleanupManager.isRunning()); + leaseCleanupManager.shutdown(); + Assert.assertFalse(leaseCleanupManager.isRunning()); + leaseCleanupManager.shutdown(); + } + /** * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed * shard case.