Adding tests and fixing specifying DDB provisioning levels when billing by request.

This commit is contained in:
Cory Bradshaw 2019-11-26 12:17:20 -08:00
parent 48ac2f27c8
commit 9d474fcfee
6 changed files with 373 additions and 7 deletions

View file

@ -38,6 +38,7 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
/**
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
@ -148,9 +149,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity).build();
CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.billingMode(billingMode).build();
final CreateTableRequest request;
if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
}else{
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.billingMode(billingMode).build();
}
final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ResourceInUseException.class, t -> t);

View file

@ -0,0 +1,37 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases;
import lombok.extern.slf4j.Slf4j;
import org.junit.Rule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mock;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
@Slf4j
public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest {
@Override
protected DynamoDBLeaseRefresher getLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
}
}

View file

@ -22,13 +22,14 @@ import org.mockito.Mock;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
@Slf4j
public class LeaseIntegrationTest {
private LeaseSerializer leaseSerializer = new DynamoDBLeaseSerializer();
protected LeaseSerializer leaseSerializer = new DynamoDBLeaseSerializer();
protected static DynamoDBLeaseRefresher leaseRefresher;
protected static DynamoDbAsyncClient ddbClient = DynamoDbAsyncClient.builder()
@ -47,8 +48,7 @@ public class LeaseIntegrationTest {
if (leaseRefresher == null) {
// Do some static setup once per class.
leaseRefresher = new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
tableCreatorCallback);
leaseRefresher = getLeaseRefresher();
}
try {
@ -72,5 +72,10 @@ public class LeaseIntegrationTest {
}
};
protected DynamoDBLeaseRefresher getLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
}
}

View file

@ -56,10 +56,12 @@ import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRefresherTest {
@ -232,6 +234,23 @@ public class DynamoDBLeaseRefresherTest {
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
}
@Test
public void testCreateLeaseTableBillingMode() throws Exception {
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
TimeoutException te = setRuleForDependencyTimeout();
when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture);
when(mockDescribeTableFuture.get(anyLong(), any()))
.thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build());
when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture);
when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te);
verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L));
}
@FunctionalInterface
private interface TestCaller {
void call() throws Exception;

View file

@ -0,0 +1,281 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases.dynamodb;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseIntegrationBillingModePayPerRequestTest;
import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseRenewerIntegrationBillingModePayPerRequestTest extends
LeaseIntegrationBillingModePayPerRequestTest {
private final String TEST_METRIC = "TestOperation";
// This test case's leases last 2 seconds
private static final long LEASE_DURATION_MILLIS = 2000L;
private LeaseRenewer renewer;
@Before
public void setup() {
renewer = new DynamoDBLeaseRenewer(leaseRefresher, "foo", LEASE_DURATION_MILLIS,
Executors.newCachedThreadPool(), new NullMetricsFactory());
}
@Test
public void testSimpleRenew() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
}
@Test
public void testLeaseLoss() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").withLease("2", "foo").build();
builder.addLeasesToRenew(renewer, "1", "2");
Lease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2");
// lose lease 2
leaseRefresher.takeLease(renewedLease, "bar");
builder.renewMutateAssert(renewer, "1");
}
@Test
public void testClear() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
renewer.clearCurrentlyHeldLeases();
builder.renewMutateAssert(renewer);
}
@Test
public void testGetCurrentlyHeldLease() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
// this should be a copy that doesn't get updated
Lease lease = renewer.getCurrentlyHeldLease("1");
assertThat(lease.leaseCounter(), equalTo(1L));
// do one renewal and make sure the old copy doesn't get updated
builder.renewMutateAssert(renewer, "1");
assertThat(lease.leaseCounter(), equalTo(1L));
}
@Test
public void testGetCurrentlyHeldLeases() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").withLease("2", "foo").build();
builder.addLeasesToRenew(renewer, "1", "2");
Lease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2");
// This should be a copy that doesn't get updated
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
assertThat(heldLeases.size(), equalTo(2));
assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L));
assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L));
// lose lease 2
leaseRefresher.takeLease(lease2, "bar");
// Do another renewal and make sure the copy doesn't change
builder.renewMutateAssert(renewer, "1");
assertThat(heldLeases.size(), equalTo(2));
assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L));
assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L));
}
@Test
public void testUpdateLease() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
Lease expected = renewer.getCurrentlyHeldLease("1");
expected.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
assertThat(renewer.updateLease(expected, expected.concurrencyToken(), TEST_METRIC, expected.leaseKey()),
equalTo(true));
// Assert that the counter and data have changed immediately after the update...
Lease actual = renewer.getCurrentlyHeldLease("1");
expected.leaseCounter(expected.leaseCounter() + 1);
assertThat(actual, equalTo(expected));
// ...and after another round of renewal
renewer.renewLeases();
actual = renewer.getCurrentlyHeldLease("1");
expected.leaseCounter(expected.leaseCounter() + 1);
assertThat(actual, equalTo(expected));
}
@Test
public void testUpdateLostLease() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
Lease lease = renewer.getCurrentlyHeldLease("1");
// cause lease loss such that the renewer doesn't realize he's lost the lease when update is called
leaseRefresher.renewLease(lease);
// renewer still thinks he has the lease
assertThat(renewer.getCurrentlyHeldLease("1"), notNullValue());
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
// update fails
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, null), equalTo(false));
// renewer no longer thinks he has the lease
assertThat(renewer.getCurrentlyHeldLease("1"), nullValue());
}
@Test
public void testUpdateOldLease() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
Lease lease = renewer.getCurrentlyHeldLease("1");
// cause lease loss such that the renewer knows the lease has been lost when update is called
leaseRefresher.takeLease(lease, "bar");
builder.renewMutateAssert(renewer);
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false));
}
@Test
public void testUpdateRegainedLease() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
Lease lease = renewer.getCurrentlyHeldLease("1");
// cause lease loss such that the renewer knows the lease has been lost when update is called
leaseRefresher.takeLease(lease, "bar");
builder.renewMutateAssert(renewer);
// regain the lease
builder.addLeasesToRenew(renewer, "1");
lease.checkpoint(new ExtendedSequenceNumber("new checkpoint"));
assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false));
}
@Test
public void testIgnoreNoRenewalTimestamp() throws LeasingException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
Lease lease = builder.withLease("1", "foo").build().get("1");
lease.lastCounterIncrementNanos(null);
renewer.addLeasesToRenew(Collections.singleton(lease));
assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0));
}
@Test
public void testLeaseTimeout() throws LeasingException, InterruptedException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "foo").build();
builder.addLeasesToRenew(renewer, "1");
builder.renewMutateAssert(renewer, "1");
// TODO: Worth eliminating this sleep using the same pattern we used on LeaseTaker?
Thread.sleep(LEASE_DURATION_MILLIS); // Wait for the lease to timeout
assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0));
}
@Test
public void testInitialize() throws LeasingException {
final String shardId = "shd-0-0";
final String owner = "foo:8000";
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease(shardId, owner);
Map<String, Lease> leases = builder.build();
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
Executors.newCachedThreadPool(), new NullMetricsFactory());
renewer.initialize();
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
assertThat(heldLeases.size(), equalTo(leases.size()));
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
}
@Test
public void testInitializeBillingMode() throws LeasingException {
final String shardId = "shd-0-0";
final String owner = "foo:8000";
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease(shardId, owner);
Map<String, Lease> leases = builder.build();
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
Executors.newCachedThreadPool(), new NullMetricsFactory());
renewer.initialize();
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
assertThat(heldLeases.size(), equalTo(leases.size()));
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
}
}

View file

@ -262,4 +262,19 @@ public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest {
assertThat(heldLeases.size(), equalTo(leases.size()));
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
}
@Test
public void testInitializeBillingMode() throws LeasingException {
final String shardId = "shd-0-0";
final String owner = "foo:8000";
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease(shardId, owner);
Map<String, Lease> leases = builder.build();
DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L,
Executors.newCachedThreadPool(), new NullMetricsFactory());
renewer.initialize();
Map<String, Lease> heldLeases = renewer.getCurrentlyHeldLeases();
assertThat(heldLeases.size(), equalTo(leases.size()));
assertThat(heldLeases.keySet(), equalTo(leases.keySet()));
}
}