diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 8290aefd..74f82d6e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -38,6 +38,7 @@ import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; @@ -162,7 +163,20 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) throws ProvisionedThroughputException, DependencyException { // DynamoDB is now created in PayPerRequest billing mode by default. Keeping this for backward compatibility. - return createLeaseTableIfNotExists(); + ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) + .writeCapacityUnits(writeCapacity).build(); + final CreateTableRequest request; + if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){ + request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()) + .billingMode(billingMode).build(); + }else{ + request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput) + .build(); + } + + return createTableIfNotExists(request); } /** @@ -171,6 +185,15 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException { + final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()) + .billingMode(billingMode).build(); + + return createTableIfNotExists(request); + } + + private boolean createTableIfNotExists(CreateTableRequest request) + throws ProvisionedThroughputException, DependencyException { try { if (tableStatus() != null) { return newTableCreated; @@ -181,9 +204,6 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { // log.error("Failed to get table status for {}", table, de); } - final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()) - .billingMode(billingMode).build(); final AWSExceptionManager exceptionManager = createExceptionManager(); exceptionManager.add(ResourceInUseException.class, t -> t); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 0d418ca3..5e612ade 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -111,7 +111,10 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { @Override public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) throws ProvisionedThroughputException, DependencyException { - return createLeaseTableIfNotExists(); + throwExceptions("createLeaseTableIfNotExists", + ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS); + + return leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 5188c41b..664f91ef 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -265,7 +266,40 @@ public class DynamoDBLeaseRefresherTest { } @Test - public void testCreateLeaseTableTimesOut() throws Exception { + public void testCreateLeaseTableProvisionedBillingModeIfNotExists() throws Exception { + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); + + when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(anyLong(), any())) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + + when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(anyLong(), any())).thenReturn(null); + + final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L); + + Assert.assertTrue(result); + } + + @Test + public void testCreateLeaseTableIfNotExists() throws Exception { + when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(anyLong(), any())) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + + when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(anyLong(), any())).thenReturn(null); + + final boolean result = leaseRefresher.createLeaseTableIfNotExists(); + + Assert.assertTrue(result); + } + + @Test + public void testCreateLeaseTableProvisionedBillingModeTimesOut() throws Exception { + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); TimeoutException te = setRuleForDependencyTimeout(); when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); @@ -279,10 +313,7 @@ public class DynamoDBLeaseRefresherTest { } @Test - public void testCreateLeaseTableBillingMode() throws Exception { - leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, - tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); - + public void testCreateLeaseTableTimesOut() throws Exception { TimeoutException te = setRuleForDependencyTimeout(); when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); @@ -292,7 +323,7 @@ public class DynamoDBLeaseRefresherTest { when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); - verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists()); } @FunctionalInterface