Configurable BillingModes

This commit is contained in:
Eric Meisel 2019-08-07 20:57:43 -05:00
parent a150402e9c
commit 48ac2f27c8
No known key found for this signature in database
GPG key ID: 90EDD2395C0B6CDD
3 changed files with 85 additions and 24 deletions

View file

@ -28,6 +28,7 @@ import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -165,6 +166,8 @@ public class LeaseManagementConfig {
private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT;
private BillingMode billingMode = BillingMode.PROVISIONED;
/**
* The initial position for getting records from Kinesis streams.
*
@ -267,7 +270,7 @@ public class LeaseManagementConfig {
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout());
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
}
return leaseManagementFactory;
}

View file

@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -75,6 +76,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final long initialLeaseTableWriteCapacity;
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
/**
* Constructor.
@ -254,6 +256,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
@ -266,6 +269,58 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
@ -292,6 +347,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
}
@Override
@ -324,7 +380,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads,
tableCreatorCallback, dynamoDbRequestTimeout);
tableCreatorCallback, dynamoDbRequestTimeout, billingMode);
}
@Override

View file

@ -25,26 +25,7 @@ import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
@ -72,6 +53,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private boolean newTableCreated = false;
@ -118,15 +100,34 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
*/
@Deprecated
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
}
/**
* Constructor.
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
*/
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
final BillingMode billingMode) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
this.serializer = serializer;
this.consistentReads = consistentReads;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
}
/**
@ -148,7 +149,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity).build();
CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput).build();
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.billingMode(billingMode).build();
final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ResourceInUseException.class, t -> t);