diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index 9b95f562..1fecbe7c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -194,6 +194,13 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
+ /**
+ * Whether to enabled deletion protection on the DynamoDB lease table created by KCL.
+ *
+ *
Default value: false
+ */
+ private boolean leaseTableDeletionProtectionEnabled = false;
+
/**
* The list of tags to be applied to the DynamoDB table created for lease management.
*
@@ -382,6 +389,7 @@ public class LeaseManagementConfig {
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
+ leaseTableDeletionProtectionEnabled(),
tags(),
leaseSerializer,
customShardDetectorProvider(),
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index 8b60f6dd..f11e2f0a 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -89,6 +89,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
+ private final boolean leaseTableDeletionProtectionEnabled;
private final Collection tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
@@ -450,7 +451,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
- deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
+ deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, false,
DefaultSdkAutoConstructList.getInstance(), leaseSerializer);
}
@@ -483,6 +484,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
+ * @param leaseTableDeletionProtectionEnabled
* @param tags
*/
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
@@ -495,15 +497,17 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection tags, LeaseSerializer leaseSerializer) {
+ Duration dynamoDbRequestTimeout, BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled,
+ Collection tags, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
- deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags, leaseSerializer,
- null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
+ deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
+ leaseTableDeletionProtectionEnabled, tags, leaseSerializer, null, false,
+ LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
this.streamConfig = streamConfig;
}
@@ -534,6 +538,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
+ * @param leaseTableDeletionProtectionEnabled
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
@@ -549,7 +554,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection tags, LeaseSerializer leaseSerializer,
+ Duration dynamoDbRequestTimeout, BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled,
+ Collection tags, LeaseSerializer leaseSerializer,
Function customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
@@ -577,6 +583,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
+ this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
@@ -648,7 +655,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,
- tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags);
+ tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseTableDeletionProtectionEnabled, tags);
}
@Override
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
index c87f3eb8..11807b9c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
@@ -80,6 +80,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
+ private final boolean leaseTableDeletionProtectionEnabled;
private final Collection tags;
private boolean newTableCreated = false;
@@ -134,7 +135,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
- this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST);
+ this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST, false);
}
/**
@@ -146,14 +147,15 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
+ * @param leaseTableDeletionProtectionEnabled
*/
@Deprecated
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
- final BillingMode billingMode) {
+ final BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout,
- billingMode, DefaultSdkAutoConstructList.getInstance());
+ billingMode, leaseTableDeletionProtectionEnabled, DefaultSdkAutoConstructList.getInstance());
}
/**
@@ -165,12 +167,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
+ * @param leaseTableDeletionProtectionEnabled
* @param tags
*/
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
- final BillingMode billingMode, final Collection tags) {
+ final BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled,
+ final Collection tags) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
@@ -178,6 +182,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
+ this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.tags = tags;
}
@@ -806,6 +811,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private CreateTableRequest.Builder createTableRequestBuilder() {
final CreateTableRequest.Builder builder = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
+ .deletionProtectionEnabled(leaseTableDeletionProtectionEnabled)
.tags(tags);
if (BillingMode.PAY_PER_REQUEST.equals(billingMode)) {
builder.billingMode(billingMode);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java
index 128d347a..c9b79189 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java
@@ -23,6 +23,6 @@ public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrati
@Override
protected DynamoDBLeaseRefresher getLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true,
- tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
+ tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST, false);
}
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java
index fd5106e4..66b221c8 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java
@@ -74,7 +74,7 @@ public class LeaseIntegrationTest {
protected DynamoDBLeaseRefresher getLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
- tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
+ tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST, false);
}
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
index 102a9f17..6daa14a3 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java
@@ -81,6 +81,7 @@ public class DynamoDBLeaseRefresherTest {
private static final String TABLE_NAME = "test";
private static final boolean CONSISTENT_READS = true;
+ private static final boolean DELETION_PROTECTION_ENABLED = false;
@Mock
private DynamoDbAsyncClient dynamoDbClient;
@@ -127,6 +128,7 @@ public class DynamoDBLeaseRefresherTest {
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.billingMode(BillingMode.PAY_PER_REQUEST)
+ .deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
.build();
}
@@ -286,7 +288,7 @@ public class DynamoDBLeaseRefresherTest {
@Test
public void testCreateLeaseTableProvisionedBillingModeIfNotExists() throws Exception {
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
- tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
+ tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, DELETION_PROTECTION_ENABLED);
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
@@ -299,6 +301,7 @@ public class DynamoDBLeaseRefresherTest {
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
+ .deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
@@ -319,7 +322,7 @@ public class DynamoDBLeaseRefresherTest {
public void testCreateLeaseTableWithTagsIfNotExists() throws Exception {
tags = Collections.singletonList(Tag.builder().key("foo").value("bar").build());
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
- tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, tags);
+ tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, DELETION_PROTECTION_ENABLED, tags);
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS))
@@ -332,6 +335,7 @@ public class DynamoDBLeaseRefresherTest {
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
+ .deletionProtectionEnabled(DELETION_PROTECTION_ENABLED)
.tags(tags)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
@@ -369,6 +373,39 @@ public class DynamoDBLeaseRefresherTest {
Assert.assertTrue(result);
}
+ @Test
+ public void testCreateLeaseTableProvisionedWithDeletionProtectionIfNotExists() throws Exception {
+ leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
+ tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, true);
+
+ when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
+ when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
+ .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
+
+ final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L)
+ .writeCapacityUnits(10L).build();
+ final CreateTableRequest createTableRequest = CreateTableRequest.builder()
+ .tableName(TABLE_NAME)
+ .keySchema(leaseSerializer.getKeySchema())
+ .attributeDefinitions(leaseSerializer.getAttributeDefinitions())
+ .provisionedThroughput(throughput)
+ .deletionProtectionEnabled(true)
+ .build();
+ when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
+ when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(null);
+
+ final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
+
+ verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
+ verify(dynamoDbClient, times(1)).createTable(createTableRequest);
+ verify(mockDescribeTableFuture, times(1))
+ .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
+ verify(mockCreateTableFuture, times(1))
+ .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
+ Assert.assertTrue(result);
+ }
+
@Test
public void testCreateLeaseTableIfNotExists_throwsDependencyException() throws Exception {
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
@@ -462,7 +499,7 @@ public class DynamoDBLeaseRefresherTest {
@Test
public void testCreateLeaseTableProvisionedBillingModeTimesOut() throws Exception {
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
- tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
+ tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, false);
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);