From 1c0c41c4e879161ee63c34fb6f7508a2a1a8daae Mon Sep 17 00:00:00 2001
From: lucienlu-aws <132623944+lucienlu-aws@users.noreply.github.com>
Date: Mon, 15 Jul 2024 14:15:52 -0700
Subject: [PATCH] Add config to enable PITR (#1365)
* Add config to enable PITR
---
.../KinesisClientLibConfiguration.java | 2 +
.../config/MultiLangDaemonConfiguration.java | 6 ++
.../MultiLangDaemonConfigurationTest.java | 45 +++++++++++
.../kinesis/leases/LeaseManagementConfig.java | 16 +++-
.../DynamoDBLeaseManagementFactory.java | 76 ++++++++++++++++++-
.../dynamodb/DynamoDBLeaseRefresher.java | 69 ++++++++++++++++-
.../dynamodb/DynamoDBLeaseRefresherTest.java | 66 ++++++++++++++--
7 files changed, 268 insertions(+), 12 deletions(-)
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
index 179eb9e4..95c82569 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
@@ -218,6 +218,8 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
private boolean enablePriorityLeaseAssignment;
+ private boolean leaseTableDeletionProtectionEnabled;
+ private boolean leaseTablePitrEnabled;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
index 08c11d26..3336be88 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
@@ -88,6 +88,12 @@ public class MultiLangDaemonConfiguration {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private Boolean enablePriorityLeaseAssignment;
+ @ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
+ private Boolean leaseTableDeletionProtectionEnabled;
+
+ @ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
+ private Boolean leaseTablePitrEnabled;
+
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
index b98db83a..1c45eb6e 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
@@ -33,6 +33,7 @@ import software.amazon.kinesis.retrieval.polling.PollingConfig;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -99,6 +100,50 @@ public class MultiLangDaemonConfigurationTest {
assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false));
}
+ @Test
+ public void testSetLeaseTableDeletionProtectionEnabledToTrue() {
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setLeaseTableDeletionProtectionEnabled(true);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
+ }
+
+ @Test
+ public void testSetLeaseTablePitrEnabledToTrue() {
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setLeaseTablePitrEnabled(true);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled());
+ }
+
+ @Test
+ public void testSetLeaseTableDeletionProtectionEnabledToFalse() {
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setLeaseTableDeletionProtectionEnabled(false);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
+ }
+
+ @Test
+ public void testSetLeaseTablePitrEnabledToFalse() {
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setLeaseTablePitrEnabled(false);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
+ configuration.resolvedConfiguration(shardRecordProcessorFactory);
+
+ assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled());
+ }
+
@Test
public void testDefaultRetrievalConfig() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index c8c49a19..2d4e041c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -59,6 +59,8 @@ public class LeaseManagementConfig {
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS =
Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
+ public static final boolean DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED = false;
+ public static final boolean DEFAULT_LEASE_TABLE_PITR_ENABLED = false;
public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
@@ -208,11 +210,20 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
/**
- * Whether to enabled deletion protection on the DynamoDB lease table created by KCL.
+ * Whether to enable deletion protection on the DynamoDB lease table created by KCL. This does not update
+ * already existing tables.
*
*
Default value: false
*/
- private boolean leaseTableDeletionProtectionEnabled = false;
+ private boolean leaseTableDeletionProtectionEnabled = DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED;
+
+ /**
+ * Whether to enable PITR (point in time recovery) on the DynamoDB lease table created by KCL. If true, this can
+ * update existing table's PITR.
+ *
+ *
Default value: false
+ */
+ private boolean leaseTablePitrEnabled = DEFAULT_LEASE_TABLE_PITR_ENABLED;
/**
* The list of tags to be applied to the DynamoDB table created for lease management.
@@ -424,6 +435,7 @@ public class LeaseManagementConfig {
dynamoDbRequestTimeout(),
billingMode(),
leaseTableDeletionProtectionEnabled(),
+ leaseTablePitrEnabled(),
tags(),
leaseSerializer,
customShardDetectorProvider(),
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index 2eb3e707..e5435bfc 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -99,6 +99,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean leaseTableDeletionProtectionEnabled;
+ private final boolean leaseTablePitrEnabled;
private final Collection tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
@@ -707,7 +708,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
- false,
+ LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED,
DefaultSdkAutoConstructList.getInstance(),
leaseSerializer);
}
@@ -945,6 +946,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param isMultiStreamMode
* @param leaseCleanupConfig
*/
+ @Deprecated
public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient,
@@ -978,6 +980,76 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
Function customShardDetectorProvider,
boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
+ this(
+ kinesisClient,
+ dynamoDBClient,
+ tableName,
+ workerIdentifier,
+ executorService,
+ failoverTimeMillis,
+ enablePriorityLeaseAssignment,
+ epsilonMillis,
+ maxLeasesForWorker,
+ maxLeasesToStealAtOneTime,
+ maxLeaseRenewalThreads,
+ cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
+ shardSyncIntervalMillis,
+ consistentReads,
+ listShardsBackoffTimeMillis,
+ maxListShardsRetryAttempts,
+ maxCacheMissesBeforeReload,
+ listShardsCacheAllowedAgeInSeconds,
+ cacheMissWarningModulus,
+ initialLeaseTableReadCapacity,
+ initialLeaseTableWriteCapacity,
+ deprecatedHierarchicalShardSyncer,
+ tableCreatorCallback,
+ dynamoDbRequestTimeout,
+ billingMode,
+ leaseTableDeletionProtectionEnabled,
+ LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
+ tags,
+ leaseSerializer,
+ customShardDetectorProvider,
+ isMultiStreamMode,
+ leaseCleanupConfig);
+ }
+
+ public DynamoDBLeaseManagementFactory(
+ final KinesisAsyncClient kinesisClient,
+ final DynamoDbAsyncClient dynamoDBClient,
+ final String tableName,
+ final String workerIdentifier,
+ final ExecutorService executorService,
+ final long failoverTimeMillis,
+ final boolean enablePriorityLeaseAssignment,
+ final long epsilonMillis,
+ final int maxLeasesForWorker,
+ final int maxLeasesToStealAtOneTime,
+ final int maxLeaseRenewalThreads,
+ final boolean cleanupLeasesUponShardCompletion,
+ final boolean ignoreUnexpectedChildShards,
+ final long shardSyncIntervalMillis,
+ final boolean consistentReads,
+ final long listShardsBackoffTimeMillis,
+ final int maxListShardsRetryAttempts,
+ final int maxCacheMissesBeforeReload,
+ final long listShardsCacheAllowedAgeInSeconds,
+ final int cacheMissWarningModulus,
+ final long initialLeaseTableReadCapacity,
+ final long initialLeaseTableWriteCapacity,
+ final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
+ final TableCreatorCallback tableCreatorCallback,
+ Duration dynamoDbRequestTimeout,
+ BillingMode billingMode,
+ final boolean leaseTableDeletionProtectionEnabled,
+ final boolean leaseTablePitrEnabled,
+ Collection tags,
+ LeaseSerializer leaseSerializer,
+ Function customShardDetectorProvider,
+ boolean isMultiStreamMode,
+ LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@@ -1005,6 +1077,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
+ this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
@@ -1091,6 +1164,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
+ leaseTablePitrEnabled,
tags);
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
index 838d2d15..123f4068 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
@@ -49,6 +49,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.Tag;
+import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@@ -81,6 +82,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean leaseTableDeletionProtectionEnabled;
+ private final boolean leaseTablePitrEnabled;
private final Collection tags;
private boolean newTableCreated = false;
@@ -159,7 +161,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
tableCreatorCallback,
dynamoDbRequestTimeout,
BillingMode.PAY_PER_REQUEST,
- false);
+ LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED);
}
/**
@@ -207,6 +209,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param leaseTableDeletionProtectionEnabled
* @param tags
*/
+ @Deprecated
public DynamoDBLeaseRefresher(
final String table,
final DynamoDbAsyncClient dynamoDBClient,
@@ -217,6 +220,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final Collection tags) {
+ this(
+ table,
+ dynamoDBClient,
+ serializer,
+ consistentReads,
+ tableCreatorCallback,
+ dynamoDbRequestTimeout,
+ billingMode,
+ leaseTableDeletionProtectionEnabled,
+ LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
+ tags);
+ }
+
+ /**
+ * Constructor.
+ * @param table
+ * @param dynamoDBClient
+ * @param serializer
+ * @param consistentReads
+ * @param tableCreatorCallback
+ * @param dynamoDbRequestTimeout
+ * @param billingMode
+ * @param leaseTableDeletionProtectionEnabled
+ */
+ public DynamoDBLeaseRefresher(
+ final String table,
+ final DynamoDbAsyncClient dynamoDBClient,
+ final LeaseSerializer serializer,
+ final boolean consistentReads,
+ @NonNull final TableCreatorCallback tableCreatorCallback,
+ Duration dynamoDbRequestTimeout,
+ final BillingMode billingMode,
+ final boolean leaseTableDeletionProtectionEnabled,
+ final boolean leaseTablePitrEnabled,
+ final Collection tags) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
@@ -225,6 +263,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
+ this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.tags = tags;
}
@@ -252,7 +291,33 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest request = createTableRequestBuilder().build();
- return createTableIfNotExists(request);
+ boolean tableExists = createTableIfNotExists(request);
+
+ if (leaseTablePitrEnabled) {
+ enablePitr();
+ log.info("Enabled PITR on table {}", table);
+ }
+
+ return tableExists;
+ }
+
+ private void enablePitr() throws DependencyException {
+ final UpdateContinuousBackupsRequest request = UpdateContinuousBackupsRequest.builder()
+ .tableName(table)
+ .pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true))
+ .build();
+
+ final AWSExceptionManager exceptionManager = createExceptionManager();
+ exceptionManager.add(ResourceNotFoundException.class, t -> t);
+ exceptionManager.add(ProvisionedThroughputExceededException.class, t -> t);
+
+ try {
+ FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateContinuousBackups(request), dynamoDbRequestTimeout);
+ } catch (ExecutionException e) {
+ throw exceptionManager.apply(e.getCause());
+ } catch (InterruptedException | DynamoDbException | TimeoutException e) {
+ throw new DependencyException(e);
+ }
}
private boolean createTableIfNotExists(CreateTableRequest request)
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
index e757c93b..2668918c 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
@@ -31,6 +31,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
@@ -54,6 +55,8 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.Tag;
+import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsResponse;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.kinesis.leases.Lease;
@@ -80,6 +83,10 @@ public class DynamoDBLeaseRefresherTest {
private static final String TABLE_NAME = "test";
private static final boolean CONSISTENT_READS = true;
private static final boolean DELETION_PROTECTION_ENABLED = false;
+ private static final boolean PITR_ENABLED = true;
+ private static final Collection EMPTY_TAGS = DefaultSdkAutoConstructList.getInstance();
+ private static final Collection TAGS =
+ Collections.singletonList(Tag.builder().key("foo").value("bar").build());
@Mock
private DynamoDbAsyncClient dynamoDbClient;
@@ -111,6 +118,9 @@ public class DynamoDBLeaseRefresherTest {
@Mock
private CompletableFuture mockCreateTableFuture;
+ @Mock
+ private CompletableFuture mockUpdateContinuousBackupsFuture;
+
@Mock
private Lease lease;
@@ -120,8 +130,7 @@ public class DynamoDBLeaseRefresherTest {
private DynamoDBLeaseRefresher leaseRefresher;
private DescribeTableRequest describeTableRequest;
private CreateTableRequest createTableRequest;
- private Collection tags;
-
+ private UpdateContinuousBackupsRequest updateContinuousBackupsRequest;
private Map serializedLease;
@Before
@@ -139,6 +148,10 @@ public class DynamoDBLeaseRefresherTest {
.billingMode(BillingMode.PAY_PER_REQUEST)
.deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
.build();
+ updateContinuousBackupsRequest = UpdateContinuousBackupsRequest.builder()
+ .tableName(TABLE_NAME)
+ .pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(PITR_ENABLED))
+ .build();
}
@Test
@@ -353,7 +366,6 @@ public class DynamoDBLeaseRefresherTest {
@Test
public void testCreateLeaseTableWithTagsIfNotExists() throws Exception {
- tags = Collections.singletonList(Tag.builder().key("foo").value("bar").build());
leaseRefresher = new DynamoDBLeaseRefresher(
TABLE_NAME,
dynamoDbClient,
@@ -363,7 +375,7 @@ public class DynamoDBLeaseRefresherTest {
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT,
BillingMode.PROVISIONED,
DELETION_PROTECTION_ENABLED,
- tags);
+ TAGS);
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(
@@ -382,7 +394,7 @@ public class DynamoDBLeaseRefresherTest {
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
.deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
- .tags(tags)
+ .tags(TAGS)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS))
@@ -423,9 +435,49 @@ public class DynamoDBLeaseRefresherTest {
Assert.assertTrue(result);
}
+ @Test
+ public void testCreateLeaseTableIfNotExistsWithPitrEnabled() throws Exception {
+ DynamoDBLeaseRefresher leaseRefresherWithEnabledPitr = new DynamoDBLeaseRefresher(
+ TABLE_NAME,
+ dynamoDbClient,
+ leaseSerializer,
+ CONSISTENT_READS,
+ tableCreatorCallback,
+ LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT,
+ BillingMode.PAY_PER_REQUEST,
+ DELETION_PROTECTION_ENABLED,
+ PITR_ENABLED,
+ EMPTY_TAGS);
+ when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
+ when(mockDescribeTableFuture.get(
+ eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
+ .thenThrow(ResourceNotFoundException.builder()
+ .message("Table doesn't exist")
+ .build());
+ when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
+ when(mockCreateTableFuture.get(
+ eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(null);
+ when(dynamoDbClient.updateContinuousBackups(updateContinuousBackupsRequest))
+ .thenReturn(mockUpdateContinuousBackupsFuture);
+ when(mockUpdateContinuousBackupsFuture.get(
+ eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(null);
+ final boolean result = leaseRefresherWithEnabledPitr.createLeaseTableIfNotExists();
+
+ verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
+ verify(dynamoDbClient, times(1)).createTable(createTableRequest);
+ verify(dynamoDbClient, times(1)).updateContinuousBackups(updateContinuousBackupsRequest);
+ verify(mockDescribeTableFuture, times(1))
+ .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
+ verify(mockCreateTableFuture, times(1))
+ .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
+ Assert.assertTrue(result);
+ }
+
@Test
public void testCreateLeaseTableProvisionedWithDeletionProtectionIfNotExists() throws Exception {
- leaseRefresher = new DynamoDBLeaseRefresher(
+ DynamoDBLeaseRefresher leaseRefresherWithEnabledDeletionProtection = new DynamoDBLeaseRefresher(
TABLE_NAME,
dynamoDbClient,
leaseSerializer,
@@ -458,7 +510,7 @@ public class DynamoDBLeaseRefresherTest {
eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
.thenReturn(null);
- final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
+ final boolean result = leaseRefresherWithEnabledDeletionProtection.createLeaseTableIfNotExists(10L, 10L);
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
verify(dynamoDbClient, times(1)).createTable(createTableRequest);