Merge pull request #84 from rankinc/lease-renewal-threads

Always shutdown leaseRenewalThreadpool on exit.
This commit is contained in:
Justin Pfifer 2016-07-21 09:58:26 -07:00 committed by GitHub
commit d695c0ab80

View file

@ -46,7 +46,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
* the scheduling of the two previously mentioned components as well as informing LeaseRenewer when LeaseTaker takes new * the scheduling of the two previously mentioned components as well as informing LeaseRenewer when LeaseTaker takes new
* leases. * leases.
* *
*/ */
public class LeaseCoordinator<T extends Lease> { public class LeaseCoordinator<T extends Lease> {
@ -80,7 +80,7 @@ public class LeaseCoordinator<T extends Lease> {
protected final IMetricsFactory metricsFactory; protected final IMetricsFactory metricsFactory;
private ScheduledExecutorService leaseCoordinatorThreadPool; private ScheduledExecutorService leaseCoordinatorThreadPool;
private ExecutorService leaseRenewalThreadpool; private final ExecutorService leaseRenewalThreadpool;
private volatile boolean running = false; private volatile boolean running = false;
/** /**
@ -206,7 +206,7 @@ public class LeaseCoordinator<T extends Lease> {
/** /**
* Runs a single iteration of the lease taker - used by integration tests. * Runs a single iteration of the lease taker - used by integration tests.
* *
* @throws InvalidStateException * @throws InvalidStateException
* @throws DependencyException * @throws DependencyException
*/ */
@ -235,7 +235,7 @@ public class LeaseCoordinator<T extends Lease> {
/** /**
* Runs a single iteration of the lease renewer - used by integration tests. * Runs a single iteration of the lease renewer - used by integration tests.
* *
* @throws InvalidStateException * @throws InvalidStateException
* @throws DependencyException * @throws DependencyException
*/ */
@ -263,7 +263,7 @@ public class LeaseCoordinator<T extends Lease> {
/** /**
* @param leaseKey lease key to fetch currently held lease for * @param leaseKey lease key to fetch currently held lease for
* *
* @return deep copy of currently held Lease for given key, or null if we don't hold the lease for that key * @return deep copy of currently held Lease for given key, or null if we don't hold the lease for that key
*/ */
public T getCurrentlyHeldLease(String leaseKey) { public T getCurrentlyHeldLease(String leaseKey) {
@ -290,7 +290,6 @@ public class LeaseCoordinator<T extends Lease> {
leaseTaker.getWorkerIdentifier())); leaseTaker.getWorkerIdentifier()));
} else { } else {
leaseCoordinatorThreadPool.shutdownNow(); leaseCoordinatorThreadPool.shutdownNow();
leaseRenewalThreadpool.shutdownNow();
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop", LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
leaseTaker.getWorkerIdentifier(), leaseTaker.getWorkerIdentifier(),
STOP_WAIT_TIME_MILLIS)); STOP_WAIT_TIME_MILLIS));
@ -302,6 +301,7 @@ public class LeaseCoordinator<T extends Lease> {
LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool."); LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool.");
} }
leaseRenewalThreadpool.shutdownNow();
synchronized (shutdownLock) { synchronized (shutdownLock) {
leaseRenewer.clearCurrentlyHeldLeases(); leaseRenewer.clearCurrentlyHeldLeases();
running = false; running = false;
@ -317,12 +317,12 @@ public class LeaseCoordinator<T extends Lease> {
/** /**
* Updates application-specific lease values in DynamoDB. * Updates application-specific lease values in DynamoDB.
* *
* @param lease lease object containing updated values * @param lease lease object containing updated values
* @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease * @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease
* *
* @return true if update succeeded, false otherwise * @return true if update succeeded, false otherwise
* *
* @throws InvalidStateException if lease table does not exist * @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way * @throws DependencyException if DynamoDB update fails in an unexpected way