diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index e7e725f8..a69ea6ca 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -1045,6 +1045,10 @@ public class Worker implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); + + // Stop the lease cleanup manager + leaseCleanupManager.shutdown(); + // Stop the periodicShardSyncManager for the worker if (shardSyncStrategy != null) { shardSyncStrategy.onWorkerShutDown(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index 16f5f353..d19fc3ed 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -121,6 +121,23 @@ public class LeaseCleanupManager { } } + /** + * Stops the lease cleanup thread, which is scheduled periodically as specified by + * {@link LeaseCleanupManager#leaseCleanupIntervalMillis} + */ + public void shutdown() { + if (isRunning) { + LOG.info("Stopping the lease cleanup thread."); + completedLeaseStopwatch.stop(); + garbageLeaseStopwatch.stop(); + deletionThreadPool.shutdown(); + + isRunning = false; + } else { + LOG.info("Lease cleanup thread already stopped."); + } + } + /** * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion} * for checking the duplicate entries. diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java index f89ae644..23fda13d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java @@ -93,6 +93,18 @@ public class LeaseCleanupManagerTest { leaseCleanupManager.start(); } + /** + * Tests subsequent calls to shutdown {@link LeaseCleanupManager}. + */ + @Test + public final void testSubsequentShutdowns() { + leaseCleanupManager.start(); + Assert.assertTrue(leaseCleanupManager.isRunning()); + leaseCleanupManager.shutdown(); + Assert.assertFalse(leaseCleanupManager.isRunning()); + leaseCleanupManager.shutdown(); + } + /** * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed * shard case.