From b27544e71de9b221c37f31420fefe42a373d0660 Mon Sep 17 00:00:00 2001 From: Sachin Sundar P S Date: Wed, 6 Oct 2021 15:47:33 -0700 Subject: [PATCH] Added more unit tests. --- .../amazon/kinesis/leases/LeaseRefresher.java | 5 +- .../dynamodb/DynamoDBLeaseRefresher.java | 1 - .../DynamoDBLeaseCoordinatorTest.java | 67 ++++++++ .../dynamodb/DynamoDBLeaseRefresherTest.java | 146 ++++++++++++++++-- 4 files changed, 207 insertions(+), 12 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 37496afa..2fca59c7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -29,7 +29,7 @@ public interface LeaseRefresher { /** * Creates the table that will store leases. Succeeds if table already exists. - * Deprecated. Use createLeaseTableIfNotExists(). + * Deprecated. Use {@link #createLeaseTableIfNotExists()}. * * @param readCapacity * @param writeCapacity @@ -45,7 +45,8 @@ public interface LeaseRefresher { throws ProvisionedThroughputException, DependencyException; /** - * Creates the table that will store leases. Succeeds if table already exists. + * Creates the table that will store leases. Table is now created in PayPerRequest billing mode by default. + * Succeeds if table already exists. * * @return true if we created a new table (table didn't exist before) * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 74f82d6e..361db9f9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -162,7 +162,6 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) throws ProvisionedThroughputException, DependencyException { - // DynamoDB is now created in PayPerRequest billing mode by default. Keeping this for backward compatibility. ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) .writeCapacityUnits(writeCapacity).build(); final CreateTableRequest request; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java new file mode 100644 index 00000000..caa7a6c7 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java @@ -0,0 +1,67 @@ +package software.amazon.kinesis.leases.dynamodb; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.metrics.MetricsFactory; + +import java.util.UUID; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DynamoDBLeaseCoordinatorTest { + + private static final String WORKER_ID = UUID.randomUUID().toString(); + private static final long LEASE_DURATION_MILLIS = 5000L; + private static final long EPSILON_MILLIS = 25L; + private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; + private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; + private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; + private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; + private static final long SECONDS_BETWEEN_POLLS = 10L; + private static final long TIMEOUT_SECONDS = 600L; + + @Mock + private LeaseRefresher leaseRefresher; + @Mock + private MetricsFactory metricsFactory; + + private DynamoDBLeaseCoordinator leaseCoordinator; + + @Before + public void setup() { + this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, + EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, + INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); + } + + @Test + public void testInitialize_tableCreationSucceeds() throws Exception { + when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(true); + when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(true); + + leaseCoordinator.initialize(); + + verify(leaseRefresher, times(1)).createLeaseTableIfNotExists(); + verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS); + } + + @Test + public void testInitialize_tableCreationFails() throws Exception { + when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false); + when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false); + + Assert.assertThrows(DependencyException.class, () -> leaseCoordinator.initialize()); + verify(leaseRefresher, times(1)).createLeaseTableIfNotExists(); + verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 664f91ef..beed73f2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -21,6 +21,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -50,10 +52,14 @@ import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; +import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; @@ -67,6 +73,8 @@ import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + @RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseRefresherTest { @@ -100,6 +108,8 @@ public class DynamoDBLeaseRefresherTest { public ExpectedException expectedException = ExpectedException.none(); private DynamoDBLeaseRefresher leaseRefresher; + private DescribeTableRequest describeTableRequest; + private CreateTableRequest createTableRequest; private Map serializedLease; @@ -109,6 +119,13 @@ public class DynamoDBLeaseRefresherTest { tableCreatorCallback); serializedLease = new HashMap<>(); + describeTableRequest = DescribeTableRequest.builder().tableName(TABLE_NAME).build(); + createTableRequest = CreateTableRequest.builder() + .tableName(TABLE_NAME) + .keySchema(leaseSerializer.getKeySchema()) + .attributeDefinitions(leaseSerializer.getAttributeDefinitions()) + .billingMode(BillingMode.PAY_PER_REQUEST) + .build(); } @Test @@ -270,32 +287,143 @@ public class DynamoDBLeaseRefresherTest { leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); - when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); - when(mockDescribeTableFuture.get(anyLong(), any())) + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); - when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); - when(mockCreateTableFuture.get(anyLong(), any())).thenReturn(null); + final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L) + .writeCapacityUnits(10L).build(); + final CreateTableRequest createTableRequest = CreateTableRequest.builder() + .tableName(TABLE_NAME) + .keySchema(leaseSerializer.getKeySchema()) + .attributeDefinitions(leaseSerializer.getAttributeDefinitions()) + .provisionedThroughput(throughput) + .build(); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenReturn(null); final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); Assert.assertTrue(result); } @Test public void testCreateLeaseTableIfNotExists() throws Exception { - when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); - when(mockDescribeTableFuture.get(anyLong(), any())) + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); - - when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); - when(mockCreateTableFuture.get(anyLong(), any())).thenReturn(null); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenReturn(null); final boolean result = leaseRefresher.createLeaseTableIfNotExists(); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); Assert.assertTrue(result); } + @Test + public void testCreateLeaseTableIfNotExists_throwsDependencyException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(new InterruptedException()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceInUseException.builder().message("Table already exists").build()); + + Assert.assertFalse(leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_tableAlreadyExists_throwsResourceInUseException_expectFalse() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceInUseException.builder().message("Table already exists").build()); + + Assert.assertFalse(leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_throwsLimitExceededException_expectProvisionedThroughputException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(LimitExceededException.builder().build()); + + Assert.assertThrows(ProvisionedThroughputException.class, () -> leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_throwsDynamoDbException_expectDependencyException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(DynamoDbException.builder().build()); + + Assert.assertThrows(DependencyException.class, () -> leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_throwsTimeoutException_expectDependencyException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(new TimeoutException()); + + Assert.assertThrows(DependencyException.class, () -> leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + @Test public void testCreateLeaseTableProvisionedBillingModeTimesOut() throws Exception { leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,