diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DdbTableConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DdbTableConfig.java index ff9ac344..4507d961 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DdbTableConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DdbTableConfig.java @@ -14,10 +14,14 @@ */ package software.amazon.kinesis.common; +import java.util.Collection; +import java.util.Collections; + import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.Tag; /** * Configurations of a DDB table created by KCL for its internal operations. @@ -54,4 +58,19 @@ public class DdbTableConfig { * if billing mode is PROVISIONED. */ private long writeCapacity; + + /** + * Flag to enable Point in Time Recovery on the DDB table. + */ + private boolean pointInTimeRecoveryEnabled = false; + + /** + * Flag to enable deletion protection on the DDB table. + */ + private boolean deletionProtectionEnabled = false; + + /** + * Tags to add to the DDB table. + */ + private Collection tags = Collections.emptyList(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorStateDAO.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorStateDAO.java index c44328c7..36aefd0f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorStateDAO.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorStateDAO.java @@ -65,7 +65,9 @@ import software.amazon.kinesis.leases.DynamoUtils; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +import software.amazon.kinesis.utils.DdbUtil; +import static java.util.Objects.nonNull; import static software.amazon.kinesis.common.FutureUtils.unwrappingFuture; import static software.amazon.kinesis.coordinator.CoordinatorState.COORDINATOR_STATE_TABLE_HASH_KEY_ATTRIBUTE_NAME; @@ -302,6 +304,7 @@ public class CoordinatorStateDAO { "Creating CoordinatorState table timed out", response.matched().exception().orElse(null)))); } + unwrappingFuture(() -> DdbUtil.pitrEnabler(config, dynamoDbAsyncClient)); } } @@ -315,7 +318,12 @@ public class CoordinatorStateDAO { .attributeDefinitions(AttributeDefinition.builder() .attributeName(COORDINATOR_STATE_TABLE_HASH_KEY_ATTRIBUTE_NAME) .attributeType(ScalarAttributeType.S) - .build()); + .build()) + .deletionProtectionEnabled(config.deletionProtectionEnabled()); + + if (nonNull(config.tags()) && !config.tags().isEmpty()) { + requestBuilder.tags(config.tags()); + } switch (config.billingMode()) { case PAY_PER_REQUEST: diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/utils/DdbUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/utils/DdbUtil.java new file mode 100644 index 00000000..f9047726 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/utils/DdbUtil.java @@ -0,0 +1,67 @@ +package software.amazon.kinesis.utils; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsResponse; +import software.amazon.kinesis.common.DdbTableConfig; + +import static java.util.Objects.nonNull; + +@Slf4j +public final class DdbUtil { + + @NonNull + public static Supplier> tableCreator( + final Supplier> keySchemaProvider, + final Supplier> attributeDefinitionProvider, + final DdbTableConfig tableConfig, + final DynamoDbAsyncClient dynamoDbAsyncClient) { + final CreateTableRequest.Builder createTableRequest = CreateTableRequest.builder() + .tableName(tableConfig.tableName()) + .keySchema(keySchemaProvider.get()) + .attributeDefinitions(attributeDefinitionProvider.get()) + .deletionProtectionEnabled(tableConfig.deletionProtectionEnabled()); + + if (nonNull(tableConfig.tags()) && !tableConfig.tags().isEmpty()) { + createTableRequest.tags(tableConfig.tags()); + } + + if (tableConfig.billingMode() == BillingMode.PROVISIONED) { + log.info( + "Creating table {} in provisioned mode with {}wcu and {}rcu", + tableConfig.tableName(), + tableConfig.writeCapacity(), + tableConfig.readCapacity()); + createTableRequest.provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(tableConfig.readCapacity()) + .writeCapacityUnits(tableConfig.writeCapacity()) + .build()); + } + createTableRequest.billingMode(tableConfig.billingMode()); + return () -> dynamoDbAsyncClient.createTable(createTableRequest.build()); + } + + public static CompletableFuture pitrEnabler( + final DdbTableConfig tableConfig, final DynamoDbAsyncClient dynamoDbAsyncClient) { + if (tableConfig.pointInTimeRecoveryEnabled()) { + final UpdateContinuousBackupsRequest request = UpdateContinuousBackupsRequest.builder() + .tableName(tableConfig.tableName()) + .pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true)) + .build(); + return dynamoDbAsyncClient.updateContinuousBackups(request); + } + return CompletableFuture.completedFuture(null); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStats.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStats.java index 280a0cce..fb26f55c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStats.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStats.java @@ -1,5 +1,6 @@ package software.amazon.kinesis.worker.metricstats; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -16,6 +17,10 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttri import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbIgnore; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.kinesis.utils.ExponentialMovingAverage; import software.amazon.kinesis.worker.metric.WorkerMetricType; @@ -299,4 +304,18 @@ public class WorkerMetricStats { && metricStatsMap.get(workerMetricName) > operatingRange.get(workerMetricName).get(0); } + + public static List getKeySchema() { + return Collections.singletonList(KeySchemaElement.builder() + .attributeName(KEY_WORKER_ID) + .keyType(KeyType.HASH) + .build()); + } + + public static List getAttributeDefinitions() { + return Collections.singletonList(AttributeDefinition.builder() + .attributeName(KEY_WORKER_ID) + .attributeType(ScalarAttributeType.S) + .build()); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStatsDAO.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStatsDAO.java index 538db7a5..f7a42c9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStatsDAO.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metricstats/WorkerMetricStatsDAO.java @@ -5,15 +5,11 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient; @@ -24,19 +20,19 @@ import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest; import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedRequest; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; -import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbAsyncWaiter; import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableConfig; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.utils.DdbUtil; import static java.util.Objects.nonNull; +import static software.amazon.kinesis.common.FutureUtils.unwrappingFuture; import static software.amazon.kinesis.worker.metricstats.WorkerMetricStats.KEY_LAST_UPDATE_TIME; import static software.amazon.kinesis.worker.metricstats.WorkerMetricStats.KEY_WORKER_ID; @@ -165,7 +161,11 @@ public class WorkerMetricStatsDAO { private void createTableIfDoesNotExist() throws DependencyException { TableDescription tableDescription = getTableDescription(); if (tableDescription == null) { - unwrappingFuture(getWorkerMetricsDynamoTableCreator()); + unwrappingFuture(DdbUtil.tableCreator( + WorkerMetricStats::getKeySchema, + WorkerMetricStats::getAttributeDefinitions, + tableConfig, + dynamoDbAsyncClient)); tableDescription = getTableDescription(); log.info("Table : {} created.", table.tableName()); } else { @@ -184,36 +184,8 @@ public class WorkerMetricStatsDAO { "Creating WorkerMetricStats table timed out", response.matched().exception().orElse(null)))); } - } - } - @NotNull - private Supplier> getWorkerMetricsDynamoTableCreator() { - final Supplier> tableCreator; - if (tableConfig.billingMode() == BillingMode.PROVISIONED) { - log.info( - "Creating worker metric stats table {} in provisioned mode with {}wcu and {}rcu", - tableConfig.tableName(), - tableConfig.writeCapacity(), - tableConfig.readCapacity()); - tableCreator = () -> table.createTable(r -> r.provisionedThroughput(ProvisionedThroughput.builder() - .readCapacityUnits(tableConfig.readCapacity()) - .writeCapacityUnits(tableConfig.writeCapacity()) - .build())); - } else { - tableCreator = table::createTable; - } - return tableCreator; - } - - static T unwrappingFuture(final Supplier> supplier) { - try { - return supplier.get().join(); - } catch (final CompletionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } - throw e; + unwrappingFuture(() -> DdbUtil.pitrEnabler(tableConfig, dynamoDbAsyncClient)); } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java index 3232209f..b2dbcfc2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/CoordinatorStateDAOTest.java @@ -15,10 +15,13 @@ package software.amazon.kinesis.coordinator; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import com.amazonaws.services.dynamodbv2.AcquireLockOptions; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; @@ -28,9 +31,14 @@ import com.amazonaws.services.dynamodbv2.local.shared.access.AmazonDynamoDBLocal import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.core.internal.waiters.DefaultWaiterResponse; +import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue; @@ -38,8 +46,14 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; +import software.amazon.awssdk.services.dynamodb.model.TableDescription; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.model.Tag; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; +import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbAsyncWaiter; import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.coordinator.CoordinatorConfig.CoordinatorStateTableConfig; import software.amazon.kinesis.coordinator.migration.ClientVersion; @@ -48,6 +62,10 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static software.amazon.kinesis.coordinator.CoordinatorState.COORDINATOR_STATE_TABLE_HASH_KEY_ATTRIBUTE_NAME; import static software.amazon.kinesis.coordinator.CoordinatorState.LEADER_HASH_KEY; import static software.amazon.kinesis.coordinator.migration.MigrationState.CLIENT_VERSION_ATTRIBUTE_NAME; @@ -91,6 +109,97 @@ public class CoordinatorStateDAOTest { response.table().provisionedThroughput().writeCapacityUnits().longValue()); } + @Test + public void testTableCreationWithDeletionProtection_assertDeletionProtectionEnabled() + throws DependencyException, ExecutionException, InterruptedException { + + final CoordinatorStateTableConfig config = getCoordinatorStateConfig( + "testTableCreationWithDeletionProtection", + ProvisionedThroughput.builder() + .writeCapacityUnits(30L) + .readCapacityUnits(15L) + .build()); + config.deletionProtectionEnabled(true); + final CoordinatorStateDAO doaUnderTest = new CoordinatorStateDAO(dynamoDbAsyncClient, config); + + doaUnderTest.initialize(); + + final DescribeTableResponse response = dynamoDbAsyncClient + .describeTable(DescribeTableRequest.builder() + .tableName("testTableCreationWithDeletionProtection-CoordinatorState") + .build()) + .get(); + + Assertions.assertTrue(response.table().deletionProtectionEnabled()); + } + + /** + * DynamoDBLocal does not support PITR and tags and thus this test is using mocks. + */ + @Test + public void testTableCreationWithTagsAndPitr_assertTags() throws DependencyException { + final DynamoDbAsyncWaiter waiter = mock(DynamoDbAsyncWaiter.class); + final WaiterResponse waiterResponse = DefaultWaiterResponse.builder() + .response(dummyDescribeTableResponse(TableStatus.ACTIVE)) + .attemptsExecuted(1) + .build(); + when(waiter.waitUntilTableExists(any(Consumer.class), any(Consumer.class))) + .thenReturn(CompletableFuture.completedFuture((WaiterResponse) waiterResponse)); + final DynamoDbAsyncClient dbAsyncClient = mock(DynamoDbAsyncClient.class); + when(dbAsyncClient.waiter()).thenReturn(waiter); + when(dbAsyncClient.createTable(any(CreateTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(CreateTableResponse.builder() + .tableDescription( + dummyDescribeTableResponse(TableStatus.CREATING).table()) + .build())); + when(dbAsyncClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + when(dbAsyncClient.describeTable(any(DescribeTableRequest.class))) + .thenThrow(ResourceNotFoundException.builder().build()) + .thenReturn(CompletableFuture.completedFuture(dummyDescribeTableResponse(TableStatus.ACTIVE))); + + final ArgumentCaptor createTableRequestArgumentCaptor = + ArgumentCaptor.forClass(CreateTableRequest.class); + final ArgumentCaptor updateContinuousBackupsRequestArgumentCaptor = + ArgumentCaptor.forClass(UpdateContinuousBackupsRequest.class); + + final CoordinatorStateTableConfig config = getCoordinatorStateConfig( + "testTableCreationWithTagsAndPitr", + ProvisionedThroughput.builder() + .writeCapacityUnits(30L) + .readCapacityUnits(15L) + .build()); + config.tableName("testTableCreationWithTagsAndPitr"); + config.pointInTimeRecoveryEnabled(true); + config.tags( + Collections.singleton(Tag.builder().key("Key").value("Value").build())); + + final CoordinatorStateDAO doaUnderTest = new CoordinatorStateDAO(dbAsyncClient, config); + doaUnderTest.initialize(); + + verify(dbAsyncClient).createTable(createTableRequestArgumentCaptor.capture()); + verify(dbAsyncClient).updateContinuousBackups(updateContinuousBackupsRequestArgumentCaptor.capture()); + Assertions.assertEquals( + 1, createTableRequestArgumentCaptor.getValue().tags().size()); + + Assertions.assertEquals( + "Key", createTableRequestArgumentCaptor.getValue().tags().get(0).key()); + Assertions.assertEquals( + "Value", + createTableRequestArgumentCaptor.getValue().tags().get(0).value()); + Assertions.assertTrue(updateContinuousBackupsRequestArgumentCaptor + .getAllValues() + .get(0) + .pointInTimeRecoverySpecification() + .pointInTimeRecoveryEnabled()); + } + + private static DescribeTableResponse dummyDescribeTableResponse(final TableStatus tableStatus) { + return DescribeTableResponse.builder() + .table(TableDescription.builder().tableStatus(tableStatus).build()) + .build(); + } + @Test public void testPayPerUseTableCreation_DefaultTableName() throws ExecutionException, InterruptedException, DependencyException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/worker/metricstats/WorkerMetricsDAOTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/worker/metricstats/WorkerMetricsDAOTest.java index afd9712a..d712f46f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/worker/metricstats/WorkerMetricsDAOTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/worker/metricstats/WorkerMetricsDAOTest.java @@ -3,7 +3,9 @@ package software.amazon.kinesis.worker.metricstats; import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.IntStream; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; @@ -11,16 +13,25 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.core.internal.waiters.DefaultWaiterResponse; +import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable; import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient; import software.amazon.awssdk.enhanced.dynamodb.Key; import software.amazon.awssdk.enhanced.dynamodb.TableSchema; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.model.Tag; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; +import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbAsyncWaiter; import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableConfig; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -29,8 +40,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static software.amazon.awssdk.services.dynamodb.model.BillingMode.PROVISIONED; -import static software.amazon.kinesis.worker.metricstats.WorkerMetricStatsDAO.unwrappingFuture; +import static software.amazon.kinesis.common.FutureUtils.unwrappingFuture; class WorkerMetricsDAOTest { @@ -46,11 +61,18 @@ class WorkerMetricsDAOTest { dynamoDbEnhancedAsyncClient.table(TEST_WORKER_METRICS_TABLE, TableSchema.fromBean(WorkerMetricStats.class)); private WorkerMetricStatsDAO workerMetricsDAO; - void setUp() { - final WorkerMetricsTableConfig tableConfig = - (WorkerMetricsTableConfig) new WorkerMetricsTableConfig(null).tableName(TEST_WORKER_METRICS_TABLE); - this.workerMetricsDAO = new WorkerMetricStatsDAO(dynamoDbAsyncClient, tableConfig, TEST_REPORTER_FREQ_MILLIS); - assertDoesNotThrow(() -> this.workerMetricsDAO.initialize()); + private void setUp() { + final WorkerMetricsTableConfig tableConfig = new WorkerMetricsTableConfig(null); + tableConfig.tableName(TEST_WORKER_METRICS_TABLE); + this.workerMetricsDAO = setUp(tableConfig, this.dynamoDbAsyncClient); + } + + private WorkerMetricStatsDAO setUp( + final WorkerMetricsTableConfig workerMetricsTableConfig, final DynamoDbAsyncClient dynamoDbAsyncClient) { + final WorkerMetricStatsDAO dao = + new WorkerMetricStatsDAO(dynamoDbAsyncClient, workerMetricsTableConfig, TEST_REPORTER_FREQ_MILLIS); + assertDoesNotThrow(dao::initialize); + return dao; } @Test @@ -61,6 +83,80 @@ class WorkerMetricsDAOTest { .tableName(TEST_WORKER_METRICS_TABLE) .build())); assertEquals(describeTableResponse.table().tableStatus(), TableStatus.ACTIVE, "Table status is not ACTIVE"); + assertFalse(describeTableResponse.table().deletionProtectionEnabled()); + } + + @Test + void initialize_withDeletionProtection_assertDeletionProtection() { + final WorkerMetricsTableConfig config = new WorkerMetricsTableConfig(null); + config.tableName(TEST_WORKER_METRICS_TABLE); + config.deletionProtectionEnabled(true); + setUp(config, dynamoDbAsyncClient); + final DescribeTableResponse describeTableResponse = + unwrappingFuture(() -> dynamoDbAsyncClient.describeTable(DescribeTableRequest.builder() + .tableName(TEST_WORKER_METRICS_TABLE) + .build())); + + assertTrue(describeTableResponse.table().deletionProtectionEnabled()); + } + + /** + * DynamoDBLocal does not support PITR and tags and thus this test is using mocks. + */ + @Test + void initialize_withTagAndPitr_assertCall() { + final DynamoDbAsyncWaiter waiter = mock(DynamoDbAsyncWaiter.class); + final WaiterResponse waiterResponse = DefaultWaiterResponse.builder() + .response(dummyDescribeTableResponse(TableStatus.ACTIVE)) + .attemptsExecuted(1) + .build(); + when(waiter.waitUntilTableExists(any(Consumer.class), any(Consumer.class))) + .thenReturn(CompletableFuture.completedFuture((WaiterResponse) waiterResponse)); + + final DynamoDbAsyncClient dbAsyncClient = mock(DynamoDbAsyncClient.class); + when(dbAsyncClient.waiter()).thenReturn(waiter); + when(dbAsyncClient.createTable(any(CreateTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + when(dbAsyncClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + when(dbAsyncClient.describeTable(any(DescribeTableRequest.class))) + .thenThrow(ResourceNotFoundException.builder().build()) + .thenReturn(CompletableFuture.completedFuture(dummyDescribeTableResponse(TableStatus.CREATING))) + .thenReturn(CompletableFuture.completedFuture(dummyDescribeTableResponse(TableStatus.ACTIVE))); + + final ArgumentCaptor createTableRequestArgumentCaptor = + ArgumentCaptor.forClass(CreateTableRequest.class); + final ArgumentCaptor updateContinuousBackupsRequestArgumentCaptor = + ArgumentCaptor.forClass(UpdateContinuousBackupsRequest.class); + + final WorkerMetricsTableConfig config = new WorkerMetricsTableConfig(null); + config.tableName(TEST_WORKER_METRICS_TABLE); + config.pointInTimeRecoveryEnabled(true); + config.tags( + Collections.singleton(Tag.builder().key("Key").value("Value").build())); + setUp(config, dbAsyncClient); + + verify(dbAsyncClient).createTable(createTableRequestArgumentCaptor.capture()); + verify(dbAsyncClient).updateContinuousBackups(updateContinuousBackupsRequestArgumentCaptor.capture()); + + assertEquals(1, createTableRequestArgumentCaptor.getValue().tags().size()); + assertEquals( + "Key", createTableRequestArgumentCaptor.getValue().tags().get(0).key()); + assertEquals( + "Value", + createTableRequestArgumentCaptor.getValue().tags().get(0).value()); + + assertTrue(updateContinuousBackupsRequestArgumentCaptor + .getAllValues() + .get(0) + .pointInTimeRecoverySpecification() + .pointInTimeRecoveryEnabled()); + } + + private static DescribeTableResponse dummyDescribeTableResponse(final TableStatus tableStatus) { + return DescribeTableResponse.builder() + .table(TableDescription.builder().tableStatus(tableStatus).build()) + .build(); } @Test