LeaseCleanupManager change
This commit is contained in:
parent
c3b41c3b55
commit
f713017528
3 changed files with 22 additions and 13 deletions
|
|
@ -733,12 +733,7 @@ public class Worker implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!leaseCleanupManager.isRunning()) {
|
|
||||||
LOG.info("Starting LeaseCleanupManager.");
|
|
||||||
leaseCleanupManager.start();
|
leaseCleanupManager.start();
|
||||||
} else {
|
|
||||||
LOG.info("LeaseCleanupManager is already running. No need to start it.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we reach this point, then we either skipped the lease sync or did not have any exception for the
|
// If we reach this point, then we either skipped the lease sync or did not have any exception for the
|
||||||
// shard sync in the previous attempt.
|
// shard sync in the previous attempt.
|
||||||
|
|
|
||||||
|
|
@ -114,13 +114,16 @@ public class LeaseCleanupManager {
|
||||||
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
|
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
|
||||||
*/
|
*/
|
||||||
public void start() {
|
public void start() {
|
||||||
LOG.debug("Starting lease cleanup thread.");
|
if (!isRunning) {
|
||||||
isRunning = true;
|
LOG.info("Starting lease cleanup thread.");
|
||||||
completedLeaseStopwatch.start();
|
completedLeaseStopwatch.start();
|
||||||
garbageLeaseStopwatch.start();
|
garbageLeaseStopwatch.start();
|
||||||
|
|
||||||
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
|
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
|
isRunning = true;
|
||||||
|
} else {
|
||||||
|
LOG.info("Lease cleanup thread already running, no need to start.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.model.ChildShard;
|
||||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
@ -82,6 +83,16 @@ public class LeaseCleanupManagerTest {
|
||||||
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
|
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests subsequent calls to start {@link LeaseCleanupManager}.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public final void testSubsequentStarts() {
|
||||||
|
leaseCleanupManager.start();
|
||||||
|
Assert.assertTrue(leaseCleanupManager.isRunning());
|
||||||
|
leaseCleanupManager.start();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
||||||
* shard case.
|
* shard case.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue