Create DynamoDB tables on On-Demand billing mode by default.
This will enable KCL to create the DynamoDB tables - Lease and ShardProgress with the On-Demand billing mode instead of provisioned billing mode.
This commit is contained in:
parent
e73a8a9f3a
commit
583415618f
13 changed files with 19 additions and 355 deletions
|
|
@ -196,7 +196,7 @@ public class LeaseManagementConfig {
|
||||||
|
|
||||||
private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT;
|
private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT;
|
||||||
|
|
||||||
private BillingMode billingMode = BillingMode.PROVISIONED;
|
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
|
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
|
||||||
|
|
|
||||||
|
|
@ -30,16 +30,13 @@ public interface LeaseRefresher {
|
||||||
/**
|
/**
|
||||||
* Creates the table that will store leases. Succeeds if table already exists.
|
* Creates the table that will store leases. Succeeds if table already exists.
|
||||||
*
|
*
|
||||||
* @param readCapacity
|
|
||||||
* @param writeCapacity
|
|
||||||
*
|
|
||||||
* @return true if we created a new table (table didn't exist before)
|
* @return true if we created a new table (table didn't exist before)
|
||||||
*
|
*
|
||||||
* @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity
|
* @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity
|
||||||
* restrictions.
|
* restrictions.
|
||||||
* @throws DependencyException if DynamoDB createTable fails in an unexpected way
|
* @throws DependencyException if DynamoDB createTable fails in an unexpected way
|
||||||
*/
|
*/
|
||||||
boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
|
boolean createLeaseTableIfNotExists()
|
||||||
throws ProvisionedThroughputException, DependencyException;
|
throws ProvisionedThroughputException, DependencyException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -211,10 +211,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
||||||
@Override
|
@Override
|
||||||
public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
|
public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
|
||||||
final boolean newTableCreated =
|
final boolean newTableCreated =
|
||||||
leaseRefresher.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
if (newTableCreated) {
|
if (newTableCreated) {
|
||||||
log.info("Created new lease table for coordinator with initial read capacity of {} and write capacity of {}.",
|
log.info("Created new lease table for coordinator with pay per request billing mode.");
|
||||||
initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
|
|
||||||
}
|
}
|
||||||
// Need to wait for table in active state.
|
// Need to wait for table in active state.
|
||||||
final long secondsBetweenPolls = 10L;
|
final long secondsBetweenPolls = 10L;
|
||||||
|
|
|
||||||
|
|
@ -285,7 +285,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
||||||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||||
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
|
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
|
||||||
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
|
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,6 @@ import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
|
import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
|
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
|
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
|
||||||
|
|
@ -130,7 +129,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
|
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
|
||||||
final LeaseSerializer serializer, final boolean consistentReads,
|
final LeaseSerializer serializer, final boolean consistentReads,
|
||||||
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
|
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
|
||||||
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
|
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -160,7 +159,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity)
|
public boolean createLeaseTableIfNotExists()
|
||||||
throws ProvisionedThroughputException, DependencyException {
|
throws ProvisionedThroughputException, DependencyException {
|
||||||
try {
|
try {
|
||||||
if (tableStatus() != null) {
|
if (tableStatus() != null) {
|
||||||
|
|
@ -172,19 +171,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
||||||
//
|
//
|
||||||
log.error("Failed to get table status for {}", table, de);
|
log.error("Failed to get table status for {}", table, de);
|
||||||
}
|
}
|
||||||
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
|
final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
|
||||||
.writeCapacityUnits(writeCapacity).build();
|
|
||||||
final CreateTableRequest request;
|
|
||||||
if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){
|
|
||||||
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
|
|
||||||
.attributeDefinitions(serializer.getAttributeDefinitions())
|
.attributeDefinitions(serializer.getAttributeDefinitions())
|
||||||
.billingMode(billingMode).build();
|
.billingMode(billingMode).build();
|
||||||
}else{
|
|
||||||
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
|
|
||||||
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
final AWSExceptionManager exceptionManager = createExceptionManager();
|
final AWSExceptionManager exceptionManager = createExceptionManager();
|
||||||
exceptionManager.add(ResourceInUseException.class, t -> t);
|
exceptionManager.add(ResourceInUseException.class, t -> t);
|
||||||
|
|
|
||||||
|
|
@ -109,12 +109,12 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
|
public boolean createLeaseTableIfNotExists()
|
||||||
throws ProvisionedThroughputException, DependencyException {
|
throws ProvisionedThroughputException, DependencyException {
|
||||||
throwExceptions("createLeaseTableIfNotExists",
|
throwExceptions("createLeaseTableIfNotExists",
|
||||||
ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS);
|
ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS);
|
||||||
|
|
||||||
return leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity);
|
return leaseRefresher.createLeaseTableIfNotExists();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -68,8 +68,7 @@ public class LeaseCoordinatorExerciser {
|
||||||
LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient,
|
LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient,
|
||||||
new DynamoDBLeaseSerializer(), true, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
|
new DynamoDBLeaseSerializer(), true, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
|
||||||
|
|
||||||
if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY,
|
if (leaseRefresher.createLeaseTableIfNotExists()) {
|
||||||
INITIAL_LEASE_TABLE_WRITE_CAPACITY)) {
|
|
||||||
log.info("Waiting for newly created lease table");
|
log.info("Waiting for newly created lease table");
|
||||||
if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) {
|
if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) {
|
||||||
log.error("Table was not created in time");
|
log.error("Table was not created in time");
|
||||||
|
|
|
||||||
|
|
@ -1,37 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package software.amazon.kinesis.leases;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.rules.TestWatcher;
|
|
||||||
import org.junit.runner.Description;
|
|
||||||
import org.mockito.Mock;
|
|
||||||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
|
||||||
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
|
||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
|
|
||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
|
|
||||||
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest {
|
|
||||||
@Override
|
|
||||||
protected DynamoDBLeaseRefresher getLeaseRefresher() {
|
|
||||||
return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true,
|
|
||||||
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -54,7 +54,7 @@ public class LeaseIntegrationTest {
|
||||||
try {
|
try {
|
||||||
if (!leaseRefresher.leaseTableExists()) {
|
if (!leaseRefresher.leaseTableExists()) {
|
||||||
log.info("Creating lease table");
|
log.info("Creating lease table");
|
||||||
leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
|
|
||||||
leaseRefresher.waitUntilLeaseTableExists(10, 500);
|
leaseRefresher.waitUntilLeaseTableExists(10, 500);
|
||||||
}
|
}
|
||||||
|
|
@ -74,7 +74,7 @@ public class LeaseIntegrationTest {
|
||||||
|
|
||||||
protected DynamoDBLeaseRefresher getLeaseRefresher() {
|
protected DynamoDBLeaseRefresher getLeaseRefresher() {
|
||||||
return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
|
return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
|
||||||
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
|
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -111,9 +111,7 @@ public class ShardSyncTaskIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public final void testCall() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
public final void testCall() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
if (!leaseRefresher.leaseTableExists()) {
|
if (!leaseRefresher.leaseTableExists()) {
|
||||||
final Long readCapacity = 10L;
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
final Long writeCapacity = 10L;
|
|
||||||
leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity);
|
|
||||||
}
|
}
|
||||||
leaseRefresher.deleteAll();
|
leaseRefresher.deleteAll();
|
||||||
Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet());
|
Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet());
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
|
||||||
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(),
|
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(),
|
||||||
useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
|
useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
|
||||||
}
|
}
|
||||||
leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
|
|
||||||
int retryLeft = ATTEMPTS;
|
int retryLeft = ATTEMPTS;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -275,7 +275,7 @@ public class DynamoDBLeaseRefresherTest {
|
||||||
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
|
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
|
||||||
when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te);
|
when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te);
|
||||||
|
|
||||||
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
|
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -292,7 +292,7 @@ public class DynamoDBLeaseRefresherTest {
|
||||||
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
|
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
|
||||||
when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te);
|
when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te);
|
||||||
|
|
||||||
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
|
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists());
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
|
|
|
||||||
|
|
@ -1,281 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
|
||||||
import software.amazon.kinesis.leases.Lease;
|
|
||||||
import software.amazon.kinesis.leases.LeaseIntegrationBillingModePayPerRequestTest;
|
|
||||||
import software.amazon.kinesis.leases.LeaseRenewer;
|
|
||||||
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
|
||||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
|
||||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
|
||||||
import static org.hamcrest.CoreMatchers.nullValue;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
|
||||||
public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends
|
|
||||||
LeaseIntegrationBillingModePayPerRequestTest {
|
|
||||||
private final String TEST_METRIC = "TestOperation";
|
|
||||||
|
|
||||||
// This test case's leases last 2 seconds
|
|
||||||
private static final long LEASE_DURATION_MILLIS = 2000L;
|
|
||||||
|
|
||||||
private LeaseRenewer renewer;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setup() {
|
|
||||||
renewer = new DynamoDBLeaseRenewer(leaseRefresher, "foo", LEASE_DURATION_MILLIS,
|
|
||||||
Executors.newCachedThreadPool(), new NullMetricsFactory());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSimpleRenew() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testLeaseLoss() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").withLease("2", "foo").build();
|
|
||||||
|
|
||||||
builder.addLeasesToRenew(renewer, "1", "2");
|
|
||||||
Lease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
|
||||||
|
|
||||||
// lose lease 2
|
|
||||||
leaseRefresher.takeLease(renewedLease, "bar");
|
|
||||||
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testClear() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
renewer.clearCurrentlyHeldLeases();
|
|
||||||
builder.renewMutateAssert(renewer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetCurrentlyHeldLease() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
// this should be a copy that doesn't get updated
|
|
||||||
Lease lease = renewer.getCurrentlyHeldLease("1");
|
|
||||||
assertThat(lease.leaseCounter(), equalTo(1L));
|
|
||||||
|
|
||||||
// do one renewal and make sure the old copy doesn't get updated
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
assertThat(lease.leaseCounter(), equalTo(1L));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetCurrentlyHeldLeases() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").withLease("2", "foo").build();
|
|
||||||
builder.addLeasesToRenew(renewer, "1", "2");
|
|
||||||
Lease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2");
|
|
||||||
|
|
||||||
// This should be a copy that doesn't get updated
|
|
||||||
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
|
|
||||||
assertThat(heldLeases.size(), equalTo(2));
|
|
||||||
assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L));
|
|
||||||
assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L));
|
|
||||||
|
|
||||||
// lose lease 2
|
|
||||||
leaseRefresher.takeLease(lease2, "bar");
|
|
||||||
|
|
||||||
// Do another renewal and make sure the copy doesn't change
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
assertThat(heldLeases.size(), equalTo(2));
|
|
||||||
assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L));
|
|
||||||
assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpdateLease() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
Lease expected = renewer.getCurrentlyHeldLease("1");
|
|
||||||
expected.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
|
||||||
assertThat(renewer.updateLease(expected, expected.concurrencyToken(), TEST_METRIC, expected.leaseKey()),
|
|
||||||
equalTo(true));
|
|
||||||
|
|
||||||
// Assert that the counter and data have changed immediately after the update...
|
|
||||||
Lease actual = renewer.getCurrentlyHeldLease("1");
|
|
||||||
expected.leaseCounter(expected.leaseCounter() + 1);
|
|
||||||
assertThat(actual, equalTo(expected));
|
|
||||||
|
|
||||||
// ...and after another round of renewal
|
|
||||||
renewer.renewLeases();
|
|
||||||
actual = renewer.getCurrentlyHeldLease("1");
|
|
||||||
expected.leaseCounter(expected.leaseCounter() + 1);
|
|
||||||
assertThat(actual, equalTo(expected));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpdateLostLease() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
Lease lease = renewer.getCurrentlyHeldLease("1");
|
|
||||||
|
|
||||||
// cause lease loss such that the renewer doesn't realize he's lost the lease when update is called
|
|
||||||
leaseRefresher.renewLease(lease);
|
|
||||||
|
|
||||||
// renewer still thinks he has the lease
|
|
||||||
assertThat(renewer.getCurrentlyHeldLease("1"), notNullValue());
|
|
||||||
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
|
||||||
|
|
||||||
// update fails
|
|
||||||
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, null), equalTo(false));
|
|
||||||
// renewer no longer thinks he has the lease
|
|
||||||
assertThat(renewer.getCurrentlyHeldLease("1"), nullValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpdateOldLease() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
Lease lease = renewer.getCurrentlyHeldLease("1");
|
|
||||||
|
|
||||||
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
|
||||||
leaseRefresher.takeLease(lease, "bar");
|
|
||||||
builder.renewMutateAssert(renewer);
|
|
||||||
|
|
||||||
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
|
||||||
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpdateRegainedLease() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
Lease lease = renewer.getCurrentlyHeldLease("1");
|
|
||||||
|
|
||||||
// cause lease loss such that the renewer knows the lease has been lost when update is called
|
|
||||||
leaseRefresher.takeLease(lease, "bar");
|
|
||||||
builder.renewMutateAssert(renewer);
|
|
||||||
|
|
||||||
// regain the lease
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
|
|
||||||
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
|
|
||||||
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testIgnoreNoRenewalTimestamp() throws LeasingException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
Lease lease = builder.withLease("1", "foo").build().get("1");
|
|
||||||
lease.lastCounterIncrementNanos(null);
|
|
||||||
|
|
||||||
renewer.addLeasesToRenew(Collections.singleton(lease));
|
|
||||||
|
|
||||||
assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testLeaseTimeout() throws LeasingException, InterruptedException {
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
|
|
||||||
builder.withLease("1", "foo").build();
|
|
||||||
|
|
||||||
builder.addLeasesToRenew(renewer, "1");
|
|
||||||
builder.renewMutateAssert(renewer, "1");
|
|
||||||
|
|
||||||
// TODO: Worth eliminating this sleep using the same pattern we used on LeaseTaker?
|
|
||||||
Thread.sleep(LEASE_DURATION_MILLIS); // Wait for the lease to timeout
|
|
||||||
|
|
||||||
assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInitialize() throws LeasingException {
|
|
||||||
final String shardId = "shd-0-0";
|
|
||||||
final String owner = "foo:8000";
|
|
||||||
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
builder.withLease(shardId, owner);
|
|
||||||
Map<String, Lease> leases = builder.build();
|
|
||||||
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
|
|
||||||
Executors.newCachedThreadPool(), new NullMetricsFactory());
|
|
||||||
renewer.initialize();
|
|
||||||
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
|
|
||||||
assertThat(heldLeases.size(), equalTo(leases.size()));
|
|
||||||
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInitializeBillingMode() throws LeasingException {
|
|
||||||
final String shardId = "shd-0-0";
|
|
||||||
final String owner = "foo:8000";
|
|
||||||
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
|
||||||
builder.withLease(shardId, owner);
|
|
||||||
Map<String, Lease> leases = builder.build();
|
|
||||||
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
|
|
||||||
Executors.newCachedThreadPool(), new NullMetricsFactory());
|
|
||||||
renewer.initialize();
|
|
||||||
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
|
|
||||||
assertThat(heldLeases.size(), equalTo(leases.size()));
|
|
||||||
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Reference in a new issue