diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index db9cc145..ed2f889c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -348,12 +348,7 @@ public class Scheduler implements Runnable { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); } - if (!leaseCleanupManager.isRunning()) { - log.info("Starting LeaseCleanupManager."); - leaseCleanupManager.start(); - } else { - log.info("LeaseCleanupManager is already running. No need to start it"); - } + leaseCleanupManager.start(); // If we reach this point, then we either skipped the lease sync or did not have any exception // for any of the shard sync in the previous attempt. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index de734646..6e3104ae 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -81,12 +81,16 @@ public class LeaseCleanupManager { * {@link LeaseCleanupManager#leaseCleanupIntervalMillis} */ public void start() { - log.debug("Starting lease cleanup thread."); - completedLeaseStopwatch.start(); - garbageLeaseStopwatch.start(); - - deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, - TimeUnit.MILLISECONDS); + if (!isRunning) { + log.info("Starting lease cleanup thread."); + completedLeaseStopwatch.reset().start(); + garbageLeaseStopwatch.reset().start(); + deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, + TimeUnit.MILLISECONDS); + isRunning = true; + } else { + log.info("Lease cleanup thread already running, no need to start."); + } } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index e9d237f9..02c71b03 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -83,6 +84,16 @@ public class LeaseCleanupManagerTest { when(leaseCoordinator.updateLease(any(Lease.class), any(UUID.class), any(String.class), any(String.class))).thenReturn(true); } + /** + * Tests subsequent calls to start {@link LeaseCleanupManager}. + */ + @Test + public final void testSubsequentStarts() { + leaseCleanupManager.start(); + Assert.assertTrue(leaseCleanupManager.isRunning()); + leaseCleanupManager.start(); + } + /** * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed * shard case.