Allow tags to be added when lease table is created (#1065)

* Allow tags to be added when lease table is created

* Add javadoc comment to new member variable

* DRY up creation of the CreateTable Request builder

* Fix compiler error

* Remove unnecessary eq functions

* Fix indentation

* Add patch
This commit is contained in:
Ryan French 2023-03-23 17:26:07 +00:00 committed by GitHub
parent 0627ba50bb
commit 2ecd1c4ac5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 207 additions and 19 deletions

View file

@ -18,6 +18,7 @@ package software.amazon.kinesis.leases;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -28,8 +29,11 @@ import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -190,6 +194,13 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
/**
* The list of tags to be applied to the DynamoDB table created for lease management.
*
* <p>Default value: {@link DefaultSdkAutoConstructList}
*/
private Collection<Tag> tags = DefaultSdkAutoConstructList.getInstance();
/**
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
* If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on
@ -333,7 +344,7 @@ public class LeaseManagementConfig {
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), tags());
}
return leaseManagementFactory;
}
@ -371,6 +382,7 @@ public class LeaseManagementConfig {
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
tags(),
leaseSerializer,
customShardDetectorProvider(),
isMultiStreamingMode,

View file

@ -16,13 +16,16 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -86,6 +89,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final Collection<Tag> tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
@ -343,6 +347,61 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param tags
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags) {
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}
/**
* Constructor.
*
@ -373,6 +432,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
@Deprecated
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@ -384,13 +444,65 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this(kinesisClient, streamConfig, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
DefaultSdkAutoConstructList.getInstance(), leaseSerializer);
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamConfig
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param tags
*/
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags, leaseSerializer,
null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
this.streamConfig = streamConfig;
}
@ -437,7 +549,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
@ -469,6 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
this.tags = tags;
}
@Override
@ -535,7 +648,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,
tableCreatorCallback, dynamoDbRequestTimeout, billingMode);
tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags);
}
@Override

View file

@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
@ -46,6 +48,7 @@ import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -77,6 +80,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final Collection<Tag> tags;
private boolean newTableCreated = false;
@ -143,10 +147,29 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
@Deprecated
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, DefaultSdkAutoConstructList.getInstance());
}
/**
* Constructor.
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param tags
*/
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode, final Collection<Tag> tags) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
@ -154,6 +177,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.tags = tags;
}
/**
@ -162,20 +186,13 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
@Override
public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity)
throws ProvisionedThroughputException, DependencyException {
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
final CreateTableRequest.Builder builder = createTableRequestBuilder();
if(BillingMode.PROVISIONED.equals(billingMode)) {
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity).build();
final CreateTableRequest request;
if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
} else {
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.build();
builder.provisionedThroughput(throughput);
}
return createTableIfNotExists(request);
return createTableIfNotExists(builder.build());
}
/**
@ -184,9 +201,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
@Override
public boolean createLeaseTableIfNotExists()
throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
final CreateTableRequest request = createTableRequestBuilder().build();
return createTableIfNotExists(request);
}
@ -787,6 +802,16 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
}
private CreateTableRequest.Builder createTableRequestBuilder() {
final CreateTableRequest.Builder builder = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.tags(tags);
if (BillingMode.PAY_PER_REQUEST.equals(billingMode)) {
builder.billingMode(billingMode);
}
return builder;
}
private AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(DynamoDbException.class, t -> t);

View file

@ -26,6 +26,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -66,6 +68,7 @@ import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.Lease;
@ -109,6 +112,7 @@ public class DynamoDBLeaseRefresherTest {
private DynamoDBLeaseRefresher leaseRefresher;
private DescribeTableRequest describeTableRequest;
private CreateTableRequest createTableRequest;
private Collection<Tag> tags;
private Map<String, AttributeValue> serializedLease;
@ -313,6 +317,40 @@ public class DynamoDBLeaseRefresherTest {
Assert.assertTrue(result);
}
@Test
public void testCreateLeaseTableWithTagsIfNotExists() throws Exception {
tags = Collections.singletonList(Tag.builder().key("foo").value("bar").build());
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, tags);
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS))
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L)
.writeCapacityUnits(10L).build();
final CreateTableRequest createTableRequest = CreateTableRequest.builder()
.tableName(TABLE_NAME)
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
.tags(tags)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS))
.thenReturn(null);
final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
verify(mockDescribeTableFuture, times(1))
.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
verify(mockCreateTableFuture, times(1))
.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
Assert.assertTrue(result);
}
@Test
public void testCreateLeaseTableIfNotExists() throws Exception {
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);