diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index a5f088ce..20e0aa8f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -28,6 +28,7 @@ import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -165,6 +166,8 @@ public class LeaseManagementConfig { private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT; + private BillingMode billingMode = BillingMode.PROVISIONED; + /** * The initial position for getting records from Kinesis streams. * @@ -267,7 +270,7 @@ public class LeaseManagementConfig { initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), hierarchicalShardSyncer(), - tableCreatorCallback(), dynamoDbRequestTimeout()); + tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index c773aab2..c2ade429 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -75,6 +76,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final long initialLeaseTableWriteCapacity; private final TableCreatorCallback tableCreatorCallback; private final Duration dynamoDbRequestTimeout; + private final BillingMode billingMode; /** * Constructor. @@ -254,6 +256,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param tableCreatorCallback * @param dynamoDbRequestTimeout */ + @Deprecated public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, @@ -266,6 +269,58 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { + this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, + initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, BillingMode billingMode) { this.kinesisClient = kinesisClient; this.streamName = streamName; this.dynamoDBClient = dynamoDBClient; @@ -292,6 +347,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; + this.billingMode = billingMode; } @Override @@ -324,7 +380,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads, - tableCreatorCallback, dynamoDbRequestTimeout); + tableCreatorCallback, dynamoDbRequestTimeout, billingMode); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 3520ed83..9badb6d7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -25,26 +25,7 @@ import java.util.concurrent.TimeoutException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; -import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; -import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; -import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; -import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; -import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; -import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; -import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; -import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; -import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; -import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; -import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; -import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; -import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; -import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; -import software.amazon.awssdk.services.dynamodb.model.ScanRequest; -import software.amazon.awssdk.services.dynamodb.model.ScanResponse; -import software.amazon.awssdk.services.dynamodb.model.TableStatus; -import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.*; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; @@ -72,6 +53,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private final TableCreatorCallback tableCreatorCallback; private final Duration dynamoDbRequestTimeout; + private final BillingMode billingMode; private boolean newTableCreated = false; @@ -111,22 +93,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { /** * Constructor. - * @param table + * @param table * @param dynamoDBClient * @param serializer * @param consistentReads * @param tableCreatorCallback * @param dynamoDbRequestTimeout */ + @Deprecated public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, final LeaseSerializer serializer, final boolean consistentReads, @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + } + + /** + * Constructor. + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + */ + public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, + final BillingMode billingMode) { this.table = table; this.dynamoDBClient = dynamoDBClient; this.serializer = serializer; this.consistentReads = consistentReads; this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; + this.billingMode = billingMode; } /** @@ -148,7 +149,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) .writeCapacityUnits(writeCapacity).build(); CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput).build(); + .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput) + .billingMode(billingMode).build(); final AWSExceptionManager exceptionManager = createExceptionManager(); exceptionManager.add(ResourceInUseException.class, t -> t);