Configurable DynamoDB BillingMode (#582)
* Configurable DDB BillingModes for LeaseTables upon creation * Adding tests and fixing specifying DDB provisioning levels when billing by request.
This commit is contained in:
parent
80f5f68765
commit
8b7c1554cb
8 changed files with 456 additions and 29 deletions
|
|
@ -28,6 +28,7 @@ import lombok.Data;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
|
@ -165,6 +166,8 @@ public class LeaseManagementConfig {
|
||||||
|
|
||||||
private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT;
|
private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT;
|
||||||
|
|
||||||
|
private BillingMode billingMode = BillingMode.PROVISIONED;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The initial position for getting records from Kinesis streams.
|
* The initial position for getting records from Kinesis streams.
|
||||||
*
|
*
|
||||||
|
|
@ -267,7 +270,7 @@ public class LeaseManagementConfig {
|
||||||
initialLeaseTableReadCapacity(),
|
initialLeaseTableReadCapacity(),
|
||||||
initialLeaseTableWriteCapacity(),
|
initialLeaseTableWriteCapacity(),
|
||||||
hierarchicalShardSyncer(),
|
hierarchicalShardSyncer(),
|
||||||
tableCreatorCallback(), dynamoDbRequestTimeout());
|
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
|
||||||
}
|
}
|
||||||
return leaseManagementFactory;
|
return leaseManagementFactory;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
|
@ -75,6 +76,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
private final long initialLeaseTableWriteCapacity;
|
private final long initialLeaseTableWriteCapacity;
|
||||||
private final TableCreatorCallback tableCreatorCallback;
|
private final TableCreatorCallback tableCreatorCallback;
|
||||||
private final Duration dynamoDbRequestTimeout;
|
private final Duration dynamoDbRequestTimeout;
|
||||||
|
private final BillingMode billingMode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
|
|
@ -254,6 +256,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
* @param tableCreatorCallback
|
* @param tableCreatorCallback
|
||||||
* @param dynamoDbRequestTimeout
|
* @param dynamoDbRequestTimeout
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
|
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
|
||||||
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
|
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
|
||||||
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
|
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
|
||||||
|
|
@ -266,6 +269,58 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
|
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
|
||||||
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
||||||
Duration dynamoDbRequestTimeout) {
|
Duration dynamoDbRequestTimeout) {
|
||||||
|
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
|
||||||
|
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
|
||||||
|
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
||||||
|
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||||
|
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
|
||||||
|
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*
|
||||||
|
* @param kinesisClient
|
||||||
|
* @param streamName
|
||||||
|
* @param dynamoDBClient
|
||||||
|
* @param tableName
|
||||||
|
* @param workerIdentifier
|
||||||
|
* @param executorService
|
||||||
|
* @param initialPositionInStream
|
||||||
|
* @param failoverTimeMillis
|
||||||
|
* @param epsilonMillis
|
||||||
|
* @param maxLeasesForWorker
|
||||||
|
* @param maxLeasesToStealAtOneTime
|
||||||
|
* @param maxLeaseRenewalThreads
|
||||||
|
* @param cleanupLeasesUponShardCompletion
|
||||||
|
* @param ignoreUnexpectedChildShards
|
||||||
|
* @param shardSyncIntervalMillis
|
||||||
|
* @param consistentReads
|
||||||
|
* @param listShardsBackoffTimeMillis
|
||||||
|
* @param maxListShardsRetryAttempts
|
||||||
|
* @param maxCacheMissesBeforeReload
|
||||||
|
* @param listShardsCacheAllowedAgeInSeconds
|
||||||
|
* @param cacheMissWarningModulus
|
||||||
|
* @param initialLeaseTableReadCapacity
|
||||||
|
* @param initialLeaseTableWriteCapacity
|
||||||
|
* @param hierarchicalShardSyncer
|
||||||
|
* @param tableCreatorCallback
|
||||||
|
* @param dynamoDbRequestTimeout
|
||||||
|
* @param billingMode
|
||||||
|
*/
|
||||||
|
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
|
||||||
|
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
|
||||||
|
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
|
||||||
|
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
|
||||||
|
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
|
||||||
|
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
|
||||||
|
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
|
||||||
|
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
|
||||||
|
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
|
||||||
|
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
|
||||||
|
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
||||||
|
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
|
||||||
this.kinesisClient = kinesisClient;
|
this.kinesisClient = kinesisClient;
|
||||||
this.streamName = streamName;
|
this.streamName = streamName;
|
||||||
this.dynamoDBClient = dynamoDBClient;
|
this.dynamoDBClient = dynamoDBClient;
|
||||||
|
|
@ -292,6 +347,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
|
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
|
||||||
this.tableCreatorCallback = tableCreatorCallback;
|
this.tableCreatorCallback = tableCreatorCallback;
|
||||||
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
|
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
|
||||||
|
this.billingMode = billingMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -324,7 +380,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
@Override
|
@Override
|
||||||
public DynamoDBLeaseRefresher createLeaseRefresher() {
|
public DynamoDBLeaseRefresher createLeaseRefresher() {
|
||||||
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads,
|
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads,
|
||||||
tableCreatorCallback, dynamoDbRequestTimeout);
|
tableCreatorCallback, dynamoDbRequestTimeout, billingMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -25,26 +25,7 @@ import java.util.concurrent.TimeoutException;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
import software.amazon.awssdk.services.dynamodb.model.*;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
|
|
||||||
import software.amazon.awssdk.utils.CollectionUtils;
|
import software.amazon.awssdk.utils.CollectionUtils;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.FutureUtils;
|
import software.amazon.kinesis.common.FutureUtils;
|
||||||
|
|
@ -57,6 +38,7 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
import software.amazon.kinesis.retrieval.AWSExceptionManager;
|
import software.amazon.kinesis.retrieval.AWSExceptionManager;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
|
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
|
||||||
|
|
@ -72,6 +54,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
private final TableCreatorCallback tableCreatorCallback;
|
private final TableCreatorCallback tableCreatorCallback;
|
||||||
|
|
||||||
private final Duration dynamoDbRequestTimeout;
|
private final Duration dynamoDbRequestTimeout;
|
||||||
|
private final BillingMode billingMode;
|
||||||
|
|
||||||
private boolean newTableCreated = false;
|
private boolean newTableCreated = false;
|
||||||
|
|
||||||
|
|
@ -118,15 +101,34 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
* @param tableCreatorCallback
|
* @param tableCreatorCallback
|
||||||
* @param dynamoDbRequestTimeout
|
* @param dynamoDbRequestTimeout
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
|
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
|
||||||
final LeaseSerializer serializer, final boolean consistentReads,
|
final LeaseSerializer serializer, final boolean consistentReads,
|
||||||
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
|
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
|
||||||
|
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param table
|
||||||
|
* @param dynamoDBClient
|
||||||
|
* @param serializer
|
||||||
|
* @param consistentReads
|
||||||
|
* @param tableCreatorCallback
|
||||||
|
* @param dynamoDbRequestTimeout
|
||||||
|
* @param billingMode
|
||||||
|
*/
|
||||||
|
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
|
||||||
|
final LeaseSerializer serializer, final boolean consistentReads,
|
||||||
|
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
|
||||||
|
final BillingMode billingMode) {
|
||||||
this.table = table;
|
this.table = table;
|
||||||
this.dynamoDBClient = dynamoDBClient;
|
this.dynamoDBClient = dynamoDBClient;
|
||||||
this.serializer = serializer;
|
this.serializer = serializer;
|
||||||
this.consistentReads = consistentReads;
|
this.consistentReads = consistentReads;
|
||||||
this.tableCreatorCallback = tableCreatorCallback;
|
this.tableCreatorCallback = tableCreatorCallback;
|
||||||
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
|
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
|
||||||
|
this.billingMode = billingMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -147,8 +149,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
}
|
}
|
||||||
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
|
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
|
||||||
.writeCapacityUnits(writeCapacity).build();
|
.writeCapacityUnits(writeCapacity).build();
|
||||||
CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
|
final CreateTableRequest request;
|
||||||
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput).build();
|
if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){
|
||||||
|
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
|
||||||
|
.attributeDefinitions(serializer.getAttributeDefinitions())
|
||||||
|
.billingMode(billingMode).build();
|
||||||
|
}else{
|
||||||
|
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
|
||||||
|
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
|
||||||
|
.billingMode(billingMode).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
final AWSExceptionManager exceptionManager = createExceptionManager();
|
final AWSExceptionManager exceptionManager = createExceptionManager();
|
||||||
exceptionManager.add(ResourceInUseException.class, t -> t);
|
exceptionManager.add(ResourceInUseException.class, t -> t);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.leases;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.rules.TestWatcher;
|
||||||
|
import org.junit.runner.Description;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
|
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
||||||
|
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
|
||||||
|
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest {
|
||||||
|
@Override
|
||||||
|
protected DynamoDBLeaseRefresher getLeaseRefresher() {
|
||||||
|
return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true,
|
||||||
|
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -22,13 +22,14 @@ import org.mockito.Mock;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
|
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
|
||||||
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
|
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class LeaseIntegrationTest {
|
public class LeaseIntegrationTest {
|
||||||
private LeaseSerializer leaseSerializer = new DynamoDBLeaseSerializer();
|
protected LeaseSerializer leaseSerializer = new DynamoDBLeaseSerializer();
|
||||||
|
|
||||||
protected static DynamoDBLeaseRefresher leaseRefresher;
|
protected static DynamoDBLeaseRefresher leaseRefresher;
|
||||||
protected static DynamoDbAsyncClient ddbClient = DynamoDbAsyncClient.builder()
|
protected static DynamoDbAsyncClient ddbClient = DynamoDbAsyncClient.builder()
|
||||||
|
|
@ -47,8 +48,7 @@ public class LeaseIntegrationTest {
|
||||||
if (leaseRefresher == null) {
|
if (leaseRefresher == null) {
|
||||||
// Do some static setup once per class.
|
// Do some static setup once per class.
|
||||||
|
|
||||||
leaseRefresher = new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
|
leaseRefresher = getLeaseRefresher();
|
||||||
tableCreatorCallback);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -72,5 +72,10 @@ public class LeaseIntegrationTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
protected DynamoDBLeaseRefresher getLeaseRefresher() {
|
||||||
|
return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
|
||||||
|
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -56,10 +56,12 @@ import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
|
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
|
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
|
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
||||||
|
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
|
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||||
import software.amazon.kinesis.leases.LeaseSerializer;
|
import software.amazon.kinesis.leases.LeaseSerializer;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class DynamoDBLeaseRefresherTest {
|
public class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
|
|
@ -232,6 +234,23 @@ public class DynamoDBLeaseRefresherTest {
|
||||||
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
|
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateLeaseTableBillingMode() throws Exception {
|
||||||
|
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
|
||||||
|
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
|
||||||
|
|
||||||
|
TimeoutException te = setRuleForDependencyTimeout();
|
||||||
|
|
||||||
|
when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);
|
||||||
|
when(mockDescribeTableFuture.get(anyLong(), any()))
|
||||||
|
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
|
||||||
|
|
||||||
|
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
|
||||||
|
when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te);
|
||||||
|
|
||||||
|
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
|
||||||
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
private interface TestCaller {
|
private interface TestCaller {
|
||||||
void call() throws Exception;
|
void call() throws Exception;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,281 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
import software.amazon.kinesis.leases.Lease;
|
||||||
|
import software.amazon.kinesis.leases.LeaseIntegrationBillingModePayPerRequestTest;
|
||||||
|
import software.amazon.kinesis.leases.LeaseRenewer;
|
||||||
|
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
||||||
|
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||||
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||||
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
public class DynamoDBLeaseRenewerIntegrationBillingModePayPerRequestTest extends
|
||||||
|
LeaseIntegrationBillingModePayPerRequestTest {
|
||||||
|
private final String TEST_METRIC = "TestOperation";
|
||||||
|
|
||||||
|
// This test case's leases last 2 seconds
|
||||||
|
private static final long LEASE_DURATION_MILLIS = 2000L;
|
||||||
|
|
||||||
|
private LeaseRenewer renewer;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
renewer = new DynamoDBLeaseRenewer(leaseRefresher, "foo", LEASE_DURATION_MILLIS,
|
||||||
|
Executors.newCachedThreadPool(), new NullMetricsFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleRenew() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaseLoss() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").withLease("2", "foo").build();
|
||||||
|
|
||||||
|
builder.addLeasesToRenew(renewer, "1", "2");
|
||||||
|
Lease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
||||||
|
|
||||||
|
// lose lease 2
|
||||||
|
leaseRefresher.takeLease(renewedLease, "bar");
|
||||||
|
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClear() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
renewer.clearCurrentlyHeldLeases();
|
||||||
|
builder.renewMutateAssert(renewer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetCurrentlyHeldLease() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
// this should be a copy that doesn't get updated
|
||||||
|
Lease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
assertThat(lease.leaseCounter(), equalTo(1L));
|
||||||
|
|
||||||
|
// do one renewal and make sure the old copy doesn't get updated
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
assertThat(lease.leaseCounter(), equalTo(1L));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetCurrentlyHeldLeases() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").withLease("2", "foo").build();
|
||||||
|
builder.addLeasesToRenew(renewer, "1", "2");
|
||||||
|
Lease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
||||||
|
|
||||||
|
// This should be a copy that doesn't get updated
|
||||||
|
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
|
||||||
|
assertThat(heldLeases.size(), equalTo(2));
|
||||||
|
assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L));
|
||||||
|
assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L));
|
||||||
|
|
||||||
|
// lose lease 2
|
||||||
|
leaseRefresher.takeLease(lease2, "bar");
|
||||||
|
|
||||||
|
// Do another renewal and make sure the copy doesn't change
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
assertThat(heldLeases.size(), equalTo(2));
|
||||||
|
assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L));
|
||||||
|
assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateLease() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
Lease expected = renewer.getCurrentlyHeldLease("1");
|
||||||
|
expected.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
||||||
|
assertThat(renewer.updateLease(expected, expected.concurrencyToken(), TEST_METRIC, expected.leaseKey()),
|
||||||
|
equalTo(true));
|
||||||
|
|
||||||
|
// Assert that the counter and data have changed immediately after the update...
|
||||||
|
Lease actual = renewer.getCurrentlyHeldLease("1");
|
||||||
|
expected.leaseCounter(expected.leaseCounter() + 1);
|
||||||
|
assertThat(actual, equalTo(expected));
|
||||||
|
|
||||||
|
// ...and after another round of renewal
|
||||||
|
renewer.renewLeases();
|
||||||
|
actual = renewer.getCurrentlyHeldLease("1");
|
||||||
|
expected.leaseCounter(expected.leaseCounter() + 1);
|
||||||
|
assertThat(actual, equalTo(expected));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateLostLease() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
Lease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
|
||||||
|
// cause lease loss such that the renewer doesn't realize he's lost the lease when update is called
|
||||||
|
leaseRefresher.renewLease(lease);
|
||||||
|
|
||||||
|
// renewer still thinks he has the lease
|
||||||
|
assertThat(renewer.getCurrentlyHeldLease("1"), notNullValue());
|
||||||
|
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
||||||
|
|
||||||
|
// update fails
|
||||||
|
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, null), equalTo(false));
|
||||||
|
// renewer no longer thinks he has the lease
|
||||||
|
assertThat(renewer.getCurrentlyHeldLease("1"), nullValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateOldLease() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
Lease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
|
||||||
|
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
||||||
|
leaseRefresher.takeLease(lease, "bar");
|
||||||
|
builder.renewMutateAssert(renewer);
|
||||||
|
|
||||||
|
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
||||||
|
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateRegainedLease() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
Lease lease = renewer.getCurrentlyHeldLease("1");
|
||||||
|
|
||||||
|
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
||||||
|
leaseRefresher.takeLease(lease, "bar");
|
||||||
|
builder.renewMutateAssert(renewer);
|
||||||
|
|
||||||
|
// regain the lease
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
|
||||||
|
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
||||||
|
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIgnoreNoRenewalTimestamp() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
Lease lease = builder.withLease("1", "foo").build().get("1");
|
||||||
|
lease.lastCounterIncrementNanos(null);
|
||||||
|
|
||||||
|
renewer.addLeasesToRenew(Collections.singleton(lease));
|
||||||
|
|
||||||
|
assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaseTimeout() throws LeasingException, InterruptedException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
builder.withLease("1", "foo").build();
|
||||||
|
|
||||||
|
builder.addLeasesToRenew(renewer, "1");
|
||||||
|
builder.renewMutateAssert(renewer, "1");
|
||||||
|
|
||||||
|
// TODO: Worth eliminating this sleep using the same pattern we used on LeaseTaker?
|
||||||
|
Thread.sleep(LEASE_DURATION_MILLIS); // Wait for the lease to timeout
|
||||||
|
|
||||||
|
assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitialize() throws LeasingException {
|
||||||
|
final String shardId = "shd-0-0";
|
||||||
|
final String owner = "foo:8000";
|
||||||
|
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
builder.withLease(shardId, owner);
|
||||||
|
Map<String, Lease> leases = builder.build();
|
||||||
|
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
|
||||||
|
Executors.newCachedThreadPool(), new NullMetricsFactory());
|
||||||
|
renewer.initialize();
|
||||||
|
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
|
||||||
|
assertThat(heldLeases.size(), equalTo(leases.size()));
|
||||||
|
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitializeBillingMode() throws LeasingException {
|
||||||
|
final String shardId = "shd-0-0";
|
||||||
|
final String owner = "foo:8000";
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
builder.withLease(shardId, owner);
|
||||||
|
Map<String, Lease> leases = builder.build();
|
||||||
|
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
|
||||||
|
Executors.newCachedThreadPool(), new NullMetricsFactory());
|
||||||
|
renewer.initialize();
|
||||||
|
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
|
||||||
|
assertThat(heldLeases.size(), equalTo(leases.size()));
|
||||||
|
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -262,4 +262,19 @@ public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest {
|
||||||
assertThat(heldLeases.size(), equalTo(leases.size()));
|
assertThat(heldLeases.size(), equalTo(leases.size()));
|
||||||
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
|
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitializeBillingMode() throws LeasingException {
|
||||||
|
final String shardId = "shd-0-0";
|
||||||
|
final String owner = "foo:8000";
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
builder.withLease(shardId, owner);
|
||||||
|
Map<String, Lease> leases = builder.build();
|
||||||
|
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
|
||||||
|
Executors.newCachedThreadPool(), new NullMetricsFactory());
|
||||||
|
renewer.initialize();
|
||||||
|
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
|
||||||
|
assertThat(heldLeases.size(), equalTo(leases.size()));
|
||||||
|
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue