Add support for PITR,Tags and Deletion protection for WorkerMetrics and CooridnatorStats table
This commit is contained in:
parent
fa4bf5ba68
commit
3c4771a564
7 changed files with 333 additions and 43 deletions
|
|
@ -14,10 +14,14 @@
|
|||
*/
|
||||
package software.amazon.kinesis.common;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Tag;
|
||||
|
||||
/**
|
||||
* Configurations of a DDB table created by KCL for its internal operations.
|
||||
|
|
@ -54,4 +58,19 @@ public class DdbTableConfig {
|
|||
* if billing mode is PROVISIONED.
|
||||
*/
|
||||
private long writeCapacity;
|
||||
|
||||
/**
|
||||
* Flag to enable Point in Time Recovery on the DDB table.
|
||||
*/
|
||||
private boolean pointInTimeRecoveryEnabled = false;
|
||||
|
||||
/**
|
||||
* Flag to enable deletion protection on the DDB table.
|
||||
*/
|
||||
private boolean deletionProtectionEnabled = false;
|
||||
|
||||
/**
|
||||
* Tags to add to the DDB table.
|
||||
*/
|
||||
private Collection<Tag> tags = Collections.emptyList();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,7 +65,9 @@ import software.amazon.kinesis.leases.DynamoUtils;
|
|||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import software.amazon.kinesis.utils.DdbUtil;
|
||||
|
||||
import static java.util.Objects.nonNull;
|
||||
import static software.amazon.kinesis.common.FutureUtils.unwrappingFuture;
|
||||
import static software.amazon.kinesis.coordinator.CoordinatorState.COORDINATOR_STATE_TABLE_HASH_KEY_ATTRIBUTE_NAME;
|
||||
|
||||
|
|
@ -302,6 +304,7 @@ public class CoordinatorStateDAO {
|
|||
"Creating CoordinatorState table timed out",
|
||||
response.matched().exception().orElse(null))));
|
||||
}
|
||||
unwrappingFuture(() -> DdbUtil.pitrEnabler(config, dynamoDbAsyncClient));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -315,7 +318,12 @@ public class CoordinatorStateDAO {
|
|||
.attributeDefinitions(AttributeDefinition.builder()
|
||||
.attributeName(COORDINATOR_STATE_TABLE_HASH_KEY_ATTRIBUTE_NAME)
|
||||
.attributeType(ScalarAttributeType.S)
|
||||
.build());
|
||||
.build())
|
||||
.deletionProtectionEnabled(config.deletionProtectionEnabled());
|
||||
|
||||
if (nonNull(config.tags()) && !config.tags().isEmpty()) {
|
||||
requestBuilder.tags(config.tags());
|
||||
}
|
||||
|
||||
switch (config.billingMode()) {
|
||||
case PAY_PER_REQUEST:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
package software.amazon.kinesis.utils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import lombok.NonNull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
||||
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
|
||||
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsResponse;
|
||||
import software.amazon.kinesis.common.DdbTableConfig;
|
||||
|
||||
import static java.util.Objects.nonNull;
|
||||
|
||||
@Slf4j
|
||||
public final class DdbUtil {
|
||||
|
||||
@NonNull
|
||||
public static Supplier<CompletableFuture<CreateTableResponse>> tableCreator(
|
||||
final Supplier<List<KeySchemaElement>> keySchemaProvider,
|
||||
final Supplier<List<AttributeDefinition>> attributeDefinitionProvider,
|
||||
final DdbTableConfig tableConfig,
|
||||
final DynamoDbAsyncClient dynamoDbAsyncClient) {
|
||||
final CreateTableRequest.Builder createTableRequest = CreateTableRequest.builder()
|
||||
.tableName(tableConfig.tableName())
|
||||
.keySchema(keySchemaProvider.get())
|
||||
.attributeDefinitions(attributeDefinitionProvider.get())
|
||||
.deletionProtectionEnabled(tableConfig.deletionProtectionEnabled());
|
||||
|
||||
if (nonNull(tableConfig.tags()) && !tableConfig.tags().isEmpty()) {
|
||||
createTableRequest.tags(tableConfig.tags());
|
||||
}
|
||||
|
||||
if (tableConfig.billingMode() == BillingMode.PROVISIONED) {
|
||||
log.info(
|
||||
"Creating table {} in provisioned mode with {}wcu and {}rcu",
|
||||
tableConfig.tableName(),
|
||||
tableConfig.writeCapacity(),
|
||||
tableConfig.readCapacity());
|
||||
createTableRequest.provisionedThroughput(ProvisionedThroughput.builder()
|
||||
.readCapacityUnits(tableConfig.readCapacity())
|
||||
.writeCapacityUnits(tableConfig.writeCapacity())
|
||||
.build());
|
||||
}
|
||||
createTableRequest.billingMode(tableConfig.billingMode());
|
||||
return () -> dynamoDbAsyncClient.createTable(createTableRequest.build());
|
||||
}
|
||||
|
||||
public static CompletableFuture<UpdateContinuousBackupsResponse> pitrEnabler(
|
||||
final DdbTableConfig tableConfig, final DynamoDbAsyncClient dynamoDbAsyncClient) {
|
||||
if (tableConfig.pointInTimeRecoveryEnabled()) {
|
||||
final UpdateContinuousBackupsRequest request = UpdateContinuousBackupsRequest.builder()
|
||||
.tableName(tableConfig.tableName())
|
||||
.pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true))
|
||||
.build();
|
||||
return dynamoDbAsyncClient.updateContinuousBackups(request);
|
||||
}
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
package software.amazon.kinesis.worker.metricstats;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -16,6 +17,10 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttri
|
|||
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbIgnore;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
||||
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
|
||||
import software.amazon.awssdk.services.dynamodb.model.KeyType;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
|
||||
import software.amazon.kinesis.utils.ExponentialMovingAverage;
|
||||
import software.amazon.kinesis.worker.metric.WorkerMetricType;
|
||||
|
||||
|
|
@ -299,4 +304,18 @@ public class WorkerMetricStats {
|
|||
&& metricStatsMap.get(workerMetricName)
|
||||
> operatingRange.get(workerMetricName).get(0);
|
||||
}
|
||||
|
||||
public static List<KeySchemaElement> getKeySchema() {
|
||||
return Collections.singletonList(KeySchemaElement.builder()
|
||||
.attributeName(KEY_WORKER_ID)
|
||||
.keyType(KeyType.HASH)
|
||||
.build());
|
||||
}
|
||||
|
||||
public static List<AttributeDefinition> getAttributeDefinitions() {
|
||||
return Collections.singletonList(AttributeDefinition.builder()
|
||||
.attributeName(KEY_WORKER_ID)
|
||||
.attributeType(ScalarAttributeType.S)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,15 +5,11 @@ import java.time.Instant;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import software.amazon.awssdk.core.waiters.WaiterResponse;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient;
|
||||
|
|
@ -24,19 +20,19 @@ import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest;
|
|||
import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
||||
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbAsyncWaiter;
|
||||
import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableConfig;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.utils.DdbUtil;
|
||||
|
||||
import static java.util.Objects.nonNull;
|
||||
import static software.amazon.kinesis.common.FutureUtils.unwrappingFuture;
|
||||
import static software.amazon.kinesis.worker.metricstats.WorkerMetricStats.KEY_LAST_UPDATE_TIME;
|
||||
import static software.amazon.kinesis.worker.metricstats.WorkerMetricStats.KEY_WORKER_ID;
|
||||
|
||||
|
|
@ -165,7 +161,11 @@ public class WorkerMetricStatsDAO {
|
|||
private void createTableIfDoesNotExist() throws DependencyException {
|
||||
TableDescription tableDescription = getTableDescription();
|
||||
if (tableDescription == null) {
|
||||
unwrappingFuture(getWorkerMetricsDynamoTableCreator());
|
||||
unwrappingFuture(DdbUtil.tableCreator(
|
||||
WorkerMetricStats::getKeySchema,
|
||||
WorkerMetricStats::getAttributeDefinitions,
|
||||
tableConfig,
|
||||
dynamoDbAsyncClient));
|
||||
tableDescription = getTableDescription();
|
||||
log.info("Table : {} created.", table.tableName());
|
||||
} else {
|
||||
|
|
@ -184,36 +184,8 @@ public class WorkerMetricStatsDAO {
|
|||
"Creating WorkerMetricStats table timed out",
|
||||
response.matched().exception().orElse(null))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private Supplier<CompletableFuture<Void>> getWorkerMetricsDynamoTableCreator() {
|
||||
final Supplier<CompletableFuture<Void>> tableCreator;
|
||||
if (tableConfig.billingMode() == BillingMode.PROVISIONED) {
|
||||
log.info(
|
||||
"Creating worker metric stats table {} in provisioned mode with {}wcu and {}rcu",
|
||||
tableConfig.tableName(),
|
||||
tableConfig.writeCapacity(),
|
||||
tableConfig.readCapacity());
|
||||
tableCreator = () -> table.createTable(r -> r.provisionedThroughput(ProvisionedThroughput.builder()
|
||||
.readCapacityUnits(tableConfig.readCapacity())
|
||||
.writeCapacityUnits(tableConfig.writeCapacity())
|
||||
.build()));
|
||||
} else {
|
||||
tableCreator = table::createTable;
|
||||
}
|
||||
return tableCreator;
|
||||
}
|
||||
|
||||
static <T> T unwrappingFuture(final Supplier<CompletableFuture<T>> supplier) {
|
||||
try {
|
||||
return supplier.get().join();
|
||||
} catch (final CompletionException e) {
|
||||
if (e.getCause() instanceof RuntimeException) {
|
||||
throw (RuntimeException) e.getCause();
|
||||
}
|
||||
throw e;
|
||||
unwrappingFuture(() -> DdbUtil.pitrEnabler(tableConfig, dynamoDbAsyncClient));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,10 +15,13 @@
|
|||
package software.amazon.kinesis.coordinator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
|
||||
|
|
@ -28,9 +31,14 @@ import com.amazonaws.services.dynamodbv2.local.shared.access.AmazonDynamoDBLocal
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import software.amazon.awssdk.core.internal.waiters.DefaultWaiterResponse;
|
||||
import software.amazon.awssdk.core.waiters.WaiterResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
|
||||
|
|
@ -38,8 +46,14 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
|||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Tag;
|
||||
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbAsyncWaiter;
|
||||
import software.amazon.kinesis.common.FutureUtils;
|
||||
import software.amazon.kinesis.coordinator.CoordinatorConfig.CoordinatorStateTableConfig;
|
||||
import software.amazon.kinesis.coordinator.migration.ClientVersion;
|
||||
|
|
@ -48,6 +62,10 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
|
|||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static software.amazon.kinesis.coordinator.CoordinatorState.COORDINATOR_STATE_TABLE_HASH_KEY_ATTRIBUTE_NAME;
|
||||
import static software.amazon.kinesis.coordinator.CoordinatorState.LEADER_HASH_KEY;
|
||||
import static software.amazon.kinesis.coordinator.migration.MigrationState.CLIENT_VERSION_ATTRIBUTE_NAME;
|
||||
|
|
@ -91,6 +109,97 @@ public class CoordinatorStateDAOTest {
|
|||
response.table().provisionedThroughput().writeCapacityUnits().longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableCreationWithDeletionProtection_assertDeletionProtectionEnabled()
|
||||
throws DependencyException, ExecutionException, InterruptedException {
|
||||
|
||||
final CoordinatorStateTableConfig config = getCoordinatorStateConfig(
|
||||
"testTableCreationWithDeletionProtection",
|
||||
ProvisionedThroughput.builder()
|
||||
.writeCapacityUnits(30L)
|
||||
.readCapacityUnits(15L)
|
||||
.build());
|
||||
config.deletionProtectionEnabled(true);
|
||||
final CoordinatorStateDAO doaUnderTest = new CoordinatorStateDAO(dynamoDbAsyncClient, config);
|
||||
|
||||
doaUnderTest.initialize();
|
||||
|
||||
final DescribeTableResponse response = dynamoDbAsyncClient
|
||||
.describeTable(DescribeTableRequest.builder()
|
||||
.tableName("testTableCreationWithDeletionProtection-CoordinatorState")
|
||||
.build())
|
||||
.get();
|
||||
|
||||
Assertions.assertTrue(response.table().deletionProtectionEnabled());
|
||||
}
|
||||
|
||||
/**
|
||||
* DynamoDBLocal does not support PITR and tags and thus this test is using mocks.
|
||||
*/
|
||||
@Test
|
||||
public void testTableCreationWithTagsAndPitr_assertTags() throws DependencyException {
|
||||
final DynamoDbAsyncWaiter waiter = mock(DynamoDbAsyncWaiter.class);
|
||||
final WaiterResponse<?> waiterResponse = DefaultWaiterResponse.builder()
|
||||
.response(dummyDescribeTableResponse(TableStatus.ACTIVE))
|
||||
.attemptsExecuted(1)
|
||||
.build();
|
||||
when(waiter.waitUntilTableExists(any(Consumer.class), any(Consumer.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture((WaiterResponse<DescribeTableResponse>) waiterResponse));
|
||||
final DynamoDbAsyncClient dbAsyncClient = mock(DynamoDbAsyncClient.class);
|
||||
when(dbAsyncClient.waiter()).thenReturn(waiter);
|
||||
when(dbAsyncClient.createTable(any(CreateTableRequest.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture(CreateTableResponse.builder()
|
||||
.tableDescription(
|
||||
dummyDescribeTableResponse(TableStatus.CREATING).table())
|
||||
.build()));
|
||||
when(dbAsyncClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture(null));
|
||||
when(dbAsyncClient.describeTable(any(DescribeTableRequest.class)))
|
||||
.thenThrow(ResourceNotFoundException.builder().build())
|
||||
.thenReturn(CompletableFuture.completedFuture(dummyDescribeTableResponse(TableStatus.ACTIVE)));
|
||||
|
||||
final ArgumentCaptor<CreateTableRequest> createTableRequestArgumentCaptor =
|
||||
ArgumentCaptor.forClass(CreateTableRequest.class);
|
||||
final ArgumentCaptor<UpdateContinuousBackupsRequest> updateContinuousBackupsRequestArgumentCaptor =
|
||||
ArgumentCaptor.forClass(UpdateContinuousBackupsRequest.class);
|
||||
|
||||
final CoordinatorStateTableConfig config = getCoordinatorStateConfig(
|
||||
"testTableCreationWithTagsAndPitr",
|
||||
ProvisionedThroughput.builder()
|
||||
.writeCapacityUnits(30L)
|
||||
.readCapacityUnits(15L)
|
||||
.build());
|
||||
config.tableName("testTableCreationWithTagsAndPitr");
|
||||
config.pointInTimeRecoveryEnabled(true);
|
||||
config.tags(
|
||||
Collections.singleton(Tag.builder().key("Key").value("Value").build()));
|
||||
|
||||
final CoordinatorStateDAO doaUnderTest = new CoordinatorStateDAO(dbAsyncClient, config);
|
||||
doaUnderTest.initialize();
|
||||
|
||||
verify(dbAsyncClient).createTable(createTableRequestArgumentCaptor.capture());
|
||||
verify(dbAsyncClient).updateContinuousBackups(updateContinuousBackupsRequestArgumentCaptor.capture());
|
||||
Assertions.assertEquals(
|
||||
1, createTableRequestArgumentCaptor.getValue().tags().size());
|
||||
|
||||
Assertions.assertEquals(
|
||||
"Key", createTableRequestArgumentCaptor.getValue().tags().get(0).key());
|
||||
Assertions.assertEquals(
|
||||
"Value",
|
||||
createTableRequestArgumentCaptor.getValue().tags().get(0).value());
|
||||
Assertions.assertTrue(updateContinuousBackupsRequestArgumentCaptor
|
||||
.getAllValues()
|
||||
.get(0)
|
||||
.pointInTimeRecoverySpecification()
|
||||
.pointInTimeRecoveryEnabled());
|
||||
}
|
||||
|
||||
private static DescribeTableResponse dummyDescribeTableResponse(final TableStatus tableStatus) {
|
||||
return DescribeTableResponse.builder()
|
||||
.table(TableDescription.builder().tableStatus(tableStatus).build())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPayPerUseTableCreation_DefaultTableName()
|
||||
throws ExecutionException, InterruptedException, DependencyException {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,9 @@ package software.amazon.kinesis.worker.metricstats;
|
|||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
|
||||
|
|
@ -11,16 +13,25 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import software.amazon.awssdk.core.internal.waiters.DefaultWaiterResponse;
|
||||
import software.amazon.awssdk.core.waiters.WaiterResponse;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.Key;
|
||||
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Tag;
|
||||
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbAsyncWaiter;
|
||||
import software.amazon.kinesis.leases.LeaseManagementConfig.WorkerMetricsTableConfig;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
|
|
@ -29,8 +40,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static software.amazon.awssdk.services.dynamodb.model.BillingMode.PROVISIONED;
|
||||
import static software.amazon.kinesis.worker.metricstats.WorkerMetricStatsDAO.unwrappingFuture;
|
||||
import static software.amazon.kinesis.common.FutureUtils.unwrappingFuture;
|
||||
|
||||
class WorkerMetricsDAOTest {
|
||||
|
||||
|
|
@ -46,11 +61,18 @@ class WorkerMetricsDAOTest {
|
|||
dynamoDbEnhancedAsyncClient.table(TEST_WORKER_METRICS_TABLE, TableSchema.fromBean(WorkerMetricStats.class));
|
||||
private WorkerMetricStatsDAO workerMetricsDAO;
|
||||
|
||||
void setUp() {
|
||||
final WorkerMetricsTableConfig tableConfig =
|
||||
(WorkerMetricsTableConfig) new WorkerMetricsTableConfig(null).tableName(TEST_WORKER_METRICS_TABLE);
|
||||
this.workerMetricsDAO = new WorkerMetricStatsDAO(dynamoDbAsyncClient, tableConfig, TEST_REPORTER_FREQ_MILLIS);
|
||||
assertDoesNotThrow(() -> this.workerMetricsDAO.initialize());
|
||||
private void setUp() {
|
||||
final WorkerMetricsTableConfig tableConfig = new WorkerMetricsTableConfig(null);
|
||||
tableConfig.tableName(TEST_WORKER_METRICS_TABLE);
|
||||
this.workerMetricsDAO = setUp(tableConfig, this.dynamoDbAsyncClient);
|
||||
}
|
||||
|
||||
private WorkerMetricStatsDAO setUp(
|
||||
final WorkerMetricsTableConfig workerMetricsTableConfig, final DynamoDbAsyncClient dynamoDbAsyncClient) {
|
||||
final WorkerMetricStatsDAO dao =
|
||||
new WorkerMetricStatsDAO(dynamoDbAsyncClient, workerMetricsTableConfig, TEST_REPORTER_FREQ_MILLIS);
|
||||
assertDoesNotThrow(dao::initialize);
|
||||
return dao;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -61,6 +83,80 @@ class WorkerMetricsDAOTest {
|
|||
.tableName(TEST_WORKER_METRICS_TABLE)
|
||||
.build()));
|
||||
assertEquals(describeTableResponse.table().tableStatus(), TableStatus.ACTIVE, "Table status is not ACTIVE");
|
||||
assertFalse(describeTableResponse.table().deletionProtectionEnabled());
|
||||
}
|
||||
|
||||
@Test
|
||||
void initialize_withDeletionProtection_assertDeletionProtection() {
|
||||
final WorkerMetricsTableConfig config = new WorkerMetricsTableConfig(null);
|
||||
config.tableName(TEST_WORKER_METRICS_TABLE);
|
||||
config.deletionProtectionEnabled(true);
|
||||
setUp(config, dynamoDbAsyncClient);
|
||||
final DescribeTableResponse describeTableResponse =
|
||||
unwrappingFuture(() -> dynamoDbAsyncClient.describeTable(DescribeTableRequest.builder()
|
||||
.tableName(TEST_WORKER_METRICS_TABLE)
|
||||
.build()));
|
||||
|
||||
assertTrue(describeTableResponse.table().deletionProtectionEnabled());
|
||||
}
|
||||
|
||||
/**
|
||||
* DynamoDBLocal does not support PITR and tags and thus this test is using mocks.
|
||||
*/
|
||||
@Test
|
||||
void initialize_withTagAndPitr_assertCall() {
|
||||
final DynamoDbAsyncWaiter waiter = mock(DynamoDbAsyncWaiter.class);
|
||||
final WaiterResponse<?> waiterResponse = DefaultWaiterResponse.builder()
|
||||
.response(dummyDescribeTableResponse(TableStatus.ACTIVE))
|
||||
.attemptsExecuted(1)
|
||||
.build();
|
||||
when(waiter.waitUntilTableExists(any(Consumer.class), any(Consumer.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture((WaiterResponse<DescribeTableResponse>) waiterResponse));
|
||||
|
||||
final DynamoDbAsyncClient dbAsyncClient = mock(DynamoDbAsyncClient.class);
|
||||
when(dbAsyncClient.waiter()).thenReturn(waiter);
|
||||
when(dbAsyncClient.createTable(any(CreateTableRequest.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture(null));
|
||||
when(dbAsyncClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture(null));
|
||||
when(dbAsyncClient.describeTable(any(DescribeTableRequest.class)))
|
||||
.thenThrow(ResourceNotFoundException.builder().build())
|
||||
.thenReturn(CompletableFuture.completedFuture(dummyDescribeTableResponse(TableStatus.CREATING)))
|
||||
.thenReturn(CompletableFuture.completedFuture(dummyDescribeTableResponse(TableStatus.ACTIVE)));
|
||||
|
||||
final ArgumentCaptor<CreateTableRequest> createTableRequestArgumentCaptor =
|
||||
ArgumentCaptor.forClass(CreateTableRequest.class);
|
||||
final ArgumentCaptor<UpdateContinuousBackupsRequest> updateContinuousBackupsRequestArgumentCaptor =
|
||||
ArgumentCaptor.forClass(UpdateContinuousBackupsRequest.class);
|
||||
|
||||
final WorkerMetricsTableConfig config = new WorkerMetricsTableConfig(null);
|
||||
config.tableName(TEST_WORKER_METRICS_TABLE);
|
||||
config.pointInTimeRecoveryEnabled(true);
|
||||
config.tags(
|
||||
Collections.singleton(Tag.builder().key("Key").value("Value").build()));
|
||||
setUp(config, dbAsyncClient);
|
||||
|
||||
verify(dbAsyncClient).createTable(createTableRequestArgumentCaptor.capture());
|
||||
verify(dbAsyncClient).updateContinuousBackups(updateContinuousBackupsRequestArgumentCaptor.capture());
|
||||
|
||||
assertEquals(1, createTableRequestArgumentCaptor.getValue().tags().size());
|
||||
assertEquals(
|
||||
"Key", createTableRequestArgumentCaptor.getValue().tags().get(0).key());
|
||||
assertEquals(
|
||||
"Value",
|
||||
createTableRequestArgumentCaptor.getValue().tags().get(0).value());
|
||||
|
||||
assertTrue(updateContinuousBackupsRequestArgumentCaptor
|
||||
.getAllValues()
|
||||
.get(0)
|
||||
.pointInTimeRecoverySpecification()
|
||||
.pointInTimeRecoveryEnabled());
|
||||
}
|
||||
|
||||
private static DescribeTableResponse dummyDescribeTableResponse(final TableStatus tableStatus) {
|
||||
return DescribeTableResponse.builder()
|
||||
.table(TableDescription.builder().tableStatus(tableStatus).build())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in a new issue