From f7130175282b62627bb6538f728dc797ae215a64 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 28 Jul 2020 13:11:57 -0700 Subject: [PATCH] LeaseCleanupManager change --- .../clientlibrary/lib/worker/Worker.java | 7 +------ .../leases/impl/LeaseCleanupManager.java | 17 ++++++++++------- .../leases/impl/LeaseCleanupManagerTest.java | 11 +++++++++++ 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 5b4f31e8..8e5cce85 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -733,12 +733,7 @@ public class Worker implements Runnable { } } - if (!leaseCleanupManager.isRunning()) { - LOG.info("Starting LeaseCleanupManager."); - leaseCleanupManager.start(); - } else { - LOG.info("LeaseCleanupManager is already running. No need to start it."); - } + leaseCleanupManager.start(); // If we reach this point, then we either skipped the lease sync or did not have any exception for the // shard sync in the previous attempt. diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index af02f588..b3157e78 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -114,13 +114,16 @@ public class LeaseCleanupManager { * {@link LeaseCleanupManager#leaseCleanupIntervalMillis} */ public void start() { - LOG.debug("Starting lease cleanup thread."); - isRunning = true; - completedLeaseStopwatch.start(); - garbageLeaseStopwatch.start(); - - deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, - TimeUnit.MILLISECONDS); + if (!isRunning) { + LOG.info("Starting lease cleanup thread."); + completedLeaseStopwatch.start(); + garbageLeaseStopwatch.start(); + deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, + TimeUnit.MILLISECONDS); + isRunning = true; + } else { + LOG.info("Lease cleanup thread already running, no need to start."); + } } /** diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java index 367c0ab0..f89ae644 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java @@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -82,6 +83,16 @@ public class LeaseCleanupManagerTest { cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords); } + /** + * Tests subsequent calls to start {@link LeaseCleanupManager}. + */ + @Test + public final void testSubsequentStarts() { + leaseCleanupManager.start(); + Assert.assertTrue(leaseCleanupManager.isRunning()); + leaseCleanupManager.start(); + } + /** * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed * shard case.