Added more unit tests.
This commit is contained in:
parent
05a678f59a
commit
b27544e71d
4 changed files with 207 additions and 12 deletions
|
|
@ -29,7 +29,7 @@ public interface LeaseRefresher {
|
|||
|
||||
/**
|
||||
* Creates the table that will store leases. Succeeds if table already exists.
|
||||
* Deprecated. Use createLeaseTableIfNotExists().
|
||||
* Deprecated. Use {@link #createLeaseTableIfNotExists()}.
|
||||
*
|
||||
* @param readCapacity
|
||||
* @param writeCapacity
|
||||
|
|
@ -45,7 +45,8 @@ public interface LeaseRefresher {
|
|||
throws ProvisionedThroughputException, DependencyException;
|
||||
|
||||
/**
|
||||
* Creates the table that will store leases. Succeeds if table already exists.
|
||||
* Creates the table that will store leases. Table is now created in PayPerRequest billing mode by default.
|
||||
* Succeeds if table already exists.
|
||||
*
|
||||
* @return true if we created a new table (table didn't exist before)
|
||||
*
|
||||
|
|
|
|||
|
|
@ -162,7 +162,6 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
|||
@Override
|
||||
public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity)
|
||||
throws ProvisionedThroughputException, DependencyException {
|
||||
// DynamoDB is now created in PayPerRequest billing mode by default. Keeping this for backward compatibility.
|
||||
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
|
||||
.writeCapacityUnits(writeCapacity).build();
|
||||
final CreateTableRequest request;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
package software.amazon.kinesis.leases.dynamodb;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class DynamoDBLeaseCoordinatorTest {
|
||||
|
||||
private static final String WORKER_ID = UUID.randomUUID().toString();
|
||||
private static final long LEASE_DURATION_MILLIS = 5000L;
|
||||
private static final long EPSILON_MILLIS = 25L;
|
||||
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
|
||||
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
|
||||
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
|
||||
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
|
||||
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
|
||||
private static final long SECONDS_BETWEEN_POLLS = 10L;
|
||||
private static final long TIMEOUT_SECONDS = 600L;
|
||||
|
||||
@Mock
|
||||
private LeaseRefresher leaseRefresher;
|
||||
@Mock
|
||||
private MetricsFactory metricsFactory;
|
||||
|
||||
private DynamoDBLeaseCoordinator leaseCoordinator;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
|
||||
EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
|
||||
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitialize_tableCreationSucceeds() throws Exception {
|
||||
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(true);
|
||||
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(true);
|
||||
|
||||
leaseCoordinator.initialize();
|
||||
|
||||
verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
|
||||
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitialize_tableCreationFails() throws Exception {
|
||||
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false);
|
||||
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false);
|
||||
|
||||
Assert.assertThrows(DependencyException.class, () -> leaseCoordinator.initialize());
|
||||
verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
|
||||
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
|
||||
}
|
||||
}
|
||||
|
|
@ -21,6 +21,8 @@ import static org.mockito.Matchers.any;
|
|||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
|
@ -50,10 +52,14 @@ import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
|||
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
|
||||
|
|
@ -67,6 +73,8 @@ import software.amazon.kinesis.leases.Lease;
|
|||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||
import software.amazon.kinesis.leases.LeaseSerializer;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class DynamoDBLeaseRefresherTest {
|
||||
|
||||
|
|
@ -100,6 +108,8 @@ public class DynamoDBLeaseRefresherTest {
|
|||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
private DynamoDBLeaseRefresher leaseRefresher;
|
||||
private DescribeTableRequest describeTableRequest;
|
||||
private CreateTableRequest createTableRequest;
|
||||
|
||||
private Map<String, AttributeValue> serializedLease;
|
||||
|
||||
|
|
@ -109,6 +119,13 @@ public class DynamoDBLeaseRefresherTest {
|
|||
tableCreatorCallback);
|
||||
serializedLease = new HashMap<>();
|
||||
|
||||
describeTableRequest = DescribeTableRequest.builder().tableName(TABLE_NAME).build();
|
||||
createTableRequest = CreateTableRequest.builder()
|
||||
.tableName(TABLE_NAME)
|
||||
.keySchema(leaseSerializer.getKeySchema())
|
||||
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
|
||||
.billingMode(BillingMode.PAY_PER_REQUEST)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -270,32 +287,143 @@ public class DynamoDBLeaseRefresherTest {
|
|||
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
|
||||
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
|
||||
|
||||
when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(anyLong(), any()))
|
||||
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
|
||||
|
||||
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(anyLong(), any())).thenReturn(null);
|
||||
final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L)
|
||||
.writeCapacityUnits(10L).build();
|
||||
final CreateTableRequest createTableRequest = CreateTableRequest.builder()
|
||||
.tableName(TABLE_NAME)
|
||||
.keySchema(leaseSerializer.getKeySchema())
|
||||
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
|
||||
.provisionedThroughput(throughput)
|
||||
.build();
|
||||
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenReturn(null);
|
||||
|
||||
final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
|
||||
|
||||
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
|
||||
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
|
||||
verify(mockDescribeTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
verify(mockCreateTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLeaseTableIfNotExists() throws Exception {
|
||||
when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(anyLong(), any()))
|
||||
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
|
||||
|
||||
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(anyLong(), any())).thenReturn(null);
|
||||
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenReturn(null);
|
||||
|
||||
final boolean result = leaseRefresher.createLeaseTableIfNotExists();
|
||||
|
||||
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
|
||||
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
|
||||
verify(mockDescribeTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
verify(mockCreateTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLeaseTableIfNotExists_throwsDependencyException() throws Exception {
|
||||
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(new InterruptedException());
|
||||
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceInUseException.builder().message("Table already exists").build());
|
||||
|
||||
Assert.assertFalse(leaseRefresher.createLeaseTableIfNotExists());
|
||||
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
|
||||
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
|
||||
verify(mockDescribeTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
verify(mockCreateTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLeaseTableIfNotExists_tableAlreadyExists_throwsResourceInUseException_expectFalse() throws Exception {
|
||||
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
|
||||
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceInUseException.builder().message("Table already exists").build());
|
||||
|
||||
Assert.assertFalse(leaseRefresher.createLeaseTableIfNotExists());
|
||||
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
|
||||
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
|
||||
verify(mockDescribeTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
verify(mockCreateTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLeaseTableIfNotExists_throwsLimitExceededException_expectProvisionedThroughputException() throws Exception {
|
||||
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
|
||||
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(LimitExceededException.builder().build());
|
||||
|
||||
Assert.assertThrows(ProvisionedThroughputException.class, () -> leaseRefresher.createLeaseTableIfNotExists());
|
||||
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
|
||||
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
|
||||
verify(mockDescribeTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
verify(mockCreateTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLeaseTableIfNotExists_throwsDynamoDbException_expectDependencyException() throws Exception {
|
||||
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
|
||||
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(DynamoDbException.builder().build());
|
||||
|
||||
Assert.assertThrows(DependencyException.class, () -> leaseRefresher.createLeaseTableIfNotExists());
|
||||
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
|
||||
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
|
||||
verify(mockDescribeTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
verify(mockCreateTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLeaseTableIfNotExists_throwsTimeoutException_expectDependencyException() throws Exception {
|
||||
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
|
||||
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
|
||||
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
|
||||
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
|
||||
.thenThrow(new TimeoutException());
|
||||
|
||||
Assert.assertThrows(DependencyException.class, () -> leaseRefresher.createLeaseTableIfNotExists());
|
||||
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
|
||||
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
|
||||
verify(mockDescribeTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
verify(mockCreateTableFuture, times(1))
|
||||
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateLeaseTableProvisionedBillingModeTimesOut() throws Exception {
|
||||
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
|
||||
|
|
|
|||
Loading…
Reference in a new issue