Fix NPE on graceful shutdown before DDB LeaseCoordinator starts. (#1157)
This commit is contained in:
parent
a9b0d00852
commit
feadd5e043
2 changed files with 27 additions and 10 deletions
|
|
@ -329,8 +329,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stopLeaseTaker() {
|
public void stopLeaseTaker() {
|
||||||
takerFuture.cancel(false);
|
if (takerFuture != null) {
|
||||||
|
takerFuture.cancel(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
@ -12,7 +11,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.mockito.Mockito.times;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
|
@ -51,17 +50,34 @@ public class DynamoDBLeaseCoordinatorTest {
|
||||||
|
|
||||||
leaseCoordinator.initialize();
|
leaseCoordinator.initialize();
|
||||||
|
|
||||||
verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
|
verify(leaseRefresher).createLeaseTableIfNotExists();
|
||||||
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
|
verify(leaseRefresher).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(expected = DependencyException.class)
|
||||||
public void testInitialize_tableCreationFails() throws Exception {
|
public void testInitialize_tableCreationFails() throws Exception {
|
||||||
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false);
|
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false);
|
||||||
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false);
|
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false);
|
||||||
|
|
||||||
Assert.assertThrows(DependencyException.class, () -> leaseCoordinator.initialize());
|
try {
|
||||||
verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
|
leaseCoordinator.initialize();
|
||||||
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
|
} finally {
|
||||||
|
verify(leaseRefresher).createLeaseTableIfNotExists();
|
||||||
|
verify(leaseRefresher).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates a {@link NullPointerException} is not thrown when the lease taker
|
||||||
|
* is stopped before it starts/exists.
|
||||||
|
*
|
||||||
|
* @see <a href="https://github.com/awslabs/amazon-kinesis-client/issues/745">issue #745</a>
|
||||||
|
* @see <a href="https://github.com/awslabs/amazon-kinesis-client/issues/900">issue #900</a>
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testStopLeaseTakerBeforeStart() {
|
||||||
|
leaseCoordinator.stopLeaseTaker();
|
||||||
|
assertTrue(leaseCoordinator.getAssignments().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue