diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 4f2d3a2b..d80799fa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -18,6 +18,7 @@ package software.amazon.kinesis.leases; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.Duration; +import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -28,8 +29,11 @@ import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; import org.apache.commons.lang3.Validate; + +import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.Tag; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -190,6 +194,13 @@ public class LeaseManagementConfig { private BillingMode billingMode = BillingMode.PAY_PER_REQUEST; + /** + * The list of tags to be applied to the DynamoDB table created for lease management. + * + *

Default value: {@link DefaultSdkAutoConstructList} + */ + private Collection tags = DefaultSdkAutoConstructList.getInstance(); + /** * Frequency (in millis) of the auditor job to scan for partial leases in the lease table. * If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on @@ -333,7 +344,7 @@ public class LeaseManagementConfig { initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), hierarchicalShardSyncer(), - tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); + tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), tags()); } return leaseManagementFactory; } @@ -371,6 +382,7 @@ public class LeaseManagementConfig { tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), + tags(), leaseSerializer, customShardDetectorProvider(), isMultiStreamingMode, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 6bf2ff39..8b60f6dd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -16,13 +16,16 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; +import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; import lombok.Data; import lombok.NonNull; +import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.Tag; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -86,6 +89,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final TableCreatorCallback tableCreatorCallback; private final Duration dynamoDbRequestTimeout; private final BillingMode billingMode; + private final Collection tags; private final boolean isMultiStreamMode; private final LeaseCleanupConfig leaseCleanupConfig; @@ -343,6 +347,61 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); } + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + * @param tags + */ + @Deprecated + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection tags) { + + this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName, + workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); + } + /** * Constructor. * @@ -373,6 +432,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param dynamoDbRequestTimeout * @param billingMode */ + @Deprecated private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, @@ -384,13 +444,65 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { + this(kinesisClient, streamConfig, dynamoDBClient, tableName, + workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, + DefaultSdkAutoConstructList.getInstance(), leaseSerializer); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamConfig + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param deprecatedHierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + * @param tags + */ + private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, + final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection tags, LeaseSerializer leaseSerializer) { this(kinesisClient, dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer, + deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags, leaseSerializer, null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG); this.streamConfig = streamConfig; } @@ -437,7 +549,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, - Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer, + Duration dynamoDbRequestTimeout, BillingMode billingMode, Collection tags, LeaseSerializer leaseSerializer, Function customShardDetectorProvider, boolean isMultiStreamMode, LeaseCleanupConfig leaseCleanupConfig) { this.kinesisClient = kinesisClient; @@ -469,6 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.customShardDetectorProvider = customShardDetectorProvider; this.isMultiStreamMode = isMultiStreamMode; this.leaseCleanupConfig = leaseCleanupConfig; + this.tags = tags; } @Override @@ -535,7 +648,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads, - tableCreatorCallback, dynamoDbRequestTimeout, billingMode); + tableCreatorCallback, dynamoDbRequestTimeout, billingMode, tags); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index acb61a38..a6887f40 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; @@ -46,6 +48,7 @@ import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.model.Tag; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -77,6 +80,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private final Duration dynamoDbRequestTimeout; private final BillingMode billingMode; + private final Collection tags; private boolean newTableCreated = false; @@ -143,10 +147,29 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { * @param dynamoDbRequestTimeout * @param billingMode */ + @Deprecated public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, final LeaseSerializer serializer, final boolean consistentReads, @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, final BillingMode billingMode) { + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, DefaultSdkAutoConstructList.getInstance()); + } + + /** + * Constructor. + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + * @param tags + */ + public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, + final BillingMode billingMode, final Collection tags) { this.table = table; this.dynamoDBClient = dynamoDBClient; this.serializer = serializer; @@ -154,6 +177,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; + this.tags = tags; } /** @@ -162,20 +186,13 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) throws ProvisionedThroughputException, DependencyException { - ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) + final CreateTableRequest.Builder builder = createTableRequestBuilder(); + if(BillingMode.PROVISIONED.equals(billingMode)) { + ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) .writeCapacityUnits(writeCapacity).build(); - final CreateTableRequest request; - if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){ - request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()) - .billingMode(billingMode).build(); - } else { - request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput) - .build(); + builder.provisionedThroughput(throughput); } - - return createTableIfNotExists(request); + return createTableIfNotExists(builder.build()); } /** @@ -184,9 +201,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException { - final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()) - .billingMode(billingMode).build(); + final CreateTableRequest request = createTableRequestBuilder().build(); return createTableIfNotExists(request); } @@ -787,6 +802,16 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } } + private CreateTableRequest.Builder createTableRequestBuilder() { + final CreateTableRequest.Builder builder = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()) + .tags(tags); + if (BillingMode.PAY_PER_REQUEST.equals(billingMode)) { + builder.billingMode(billingMode); + } + return builder; + } + private AWSExceptionManager createExceptionManager() { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(DynamoDbException.class, t -> t); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index ac814d75..643cc99c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -26,6 +26,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -66,6 +68,7 @@ import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; +import software.amazon.awssdk.services.dynamodb.model.Tag; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.kinesis.leases.Lease; @@ -109,6 +112,7 @@ public class DynamoDBLeaseRefresherTest { private DynamoDBLeaseRefresher leaseRefresher; private DescribeTableRequest describeTableRequest; private CreateTableRequest createTableRequest; + private Collection tags; private Map serializedLease; @@ -313,6 +317,40 @@ public class DynamoDBLeaseRefresherTest { Assert.assertTrue(result); } + @Test + public void testCreateLeaseTableWithTagsIfNotExists() throws Exception { + tags = Collections.singletonList(Tag.builder().key("foo").value("bar").build()); + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, tags); + + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + + final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L) + .writeCapacityUnits(10L).build(); + final CreateTableRequest createTableRequest = CreateTableRequest.builder() + .tableName(TABLE_NAME) + .keySchema(leaseSerializer.getKeySchema()) + .attributeDefinitions(leaseSerializer.getAttributeDefinitions()) + .provisionedThroughput(throughput) + .tags(tags) + .build(); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) + .thenReturn(null); + + final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L); + + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + verify(mockCreateTableFuture, times(1)) + .get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + Assert.assertTrue(result); + } + @Test public void testCreateLeaseTableIfNotExists() throws Exception { when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture);