Allow tags to be added when lease table is created

This commit is contained in:
Ryan French 2023-03-16 12:14:44 +00:00
parent 9ff99f0182
commit 8ee80f9ea4
4 changed files with 192 additions and 5 deletions

View file

@ -18,6 +18,7 @@ package software.amazon.kinesis.leases;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -28,8 +29,11 @@ import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -190,6 +194,8 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
private Collection<Tag> tags = DefaultSdkAutoConstructList.getInstance();
/**
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
* If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on
@ -333,7 +339,7 @@ public class LeaseManagementConfig {
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), tags());
}
return leaseManagementFactory;
}
@ -371,6 +377,7 @@ public class LeaseManagementConfig {
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
tags(),
leaseSerializer,
customShardDetectorProvider(),
isMultiStreamingMode,

View file

@ -16,13 +16,16 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -86,6 +89,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final Collection<Tag> tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
@ -343,6 +347,61 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param tags
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags) {
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}
/**
* Constructor.
*
@ -373,6 +432,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
@Deprecated
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@ -384,13 +444,65 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this(kinesisClient, streamConfig, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
DefaultSdkAutoConstructList.getInstance(), leaseSerializer);
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamConfig
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param tags
*/
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags, leaseSerializer,
null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
this.streamConfig = streamConfig;
}
@ -437,7 +549,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection<Tag> tags, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
@ -469,6 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
this.tags = tags;
}
@Override

View file

@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
@ -46,6 +48,7 @@ import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -77,6 +80,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final Collection<Tag> tags;
private boolean newTableCreated = false;
@ -143,10 +147,29 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
@Deprecated
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, DefaultSdkAutoConstructList.getInstance());
}
/**
* Constructor.
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param tags
*/
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode, final Collection<Tag> tags) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
@ -154,6 +177,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.tags = tags;
}
/**
@ -168,10 +192,13 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
.billingMode(billingMode)
.tags(tags)
.build();
} else {
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.tags(tags)
.build();
}
@ -186,7 +213,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
.billingMode(billingMode)
.tags(tags)
.build();
return createTableIfNotExists(request);
}

View file

@ -26,6 +26,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -66,6 +68,7 @@ import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.Lease;
@ -109,6 +112,7 @@ public class DynamoDBLeaseRefresherTest {
private DynamoDBLeaseRefresher leaseRefresher;
private DescribeTableRequest describeTableRequest;
private CreateTableRequest createTableRequest;
private Collection<Tag> tags;
private Map<String, AttributeValue> serializedLease;
@ -313,6 +317,40 @@ public class DynamoDBLeaseRefresherTest {
Assert.assertTrue(result);
}
@Test
public void testCreateLeaseTableWithTagsIfNotExists() throws Exception {
tags = Arrays.asList(Tag.builder().key("foo").value("bar").build());
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, tags);
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L)
.writeCapacityUnits(10L).build();
final CreateTableRequest createTableRequest = CreateTableRequest.builder()
.tableName(TABLE_NAME)
.keySchema(leaseSerializer.getKeySchema())
.attributeDefinitions(leaseSerializer.getAttributeDefinitions())
.provisionedThroughput(throughput)
.tags(tags)
.build();
when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)))
.thenReturn(null);
final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
verify(dynamoDbClient, times(1)).describeTable(describeTableRequest);
verify(dynamoDbClient, times(1)).createTable(createTableRequest);
verify(mockDescribeTableFuture, times(1))
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
verify(mockCreateTableFuture, times(1))
.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS));
Assert.assertTrue(result);
}
@Test
public void testCreateLeaseTableIfNotExists() throws Exception {
when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);