From 07c845b93aa686e33e131e53d850431a32e95ddf Mon Sep 17 00:00:00 2001 From: Sachin Sundar P S Date: Tue, 31 Aug 2021 13:50:56 -0700 Subject: [PATCH] Keep previous table creation function for backward compatibility. --- .../amazon/kinesis/leases/LeaseRefresher.java | 17 ++ .../dynamodb/DynamoDBLeaseRefresher.java | 10 + .../ExceptionThrowingLeaseRefresher.java | 6 + ...tegrationBillingModePayPerRequestTest.java | 36 +++ .../kinesis/leases/LeaseIntegrationTest.java | 2 +- .../leases/ShardSyncTaskIntegrationTest.java | 4 +- ...namoDBLeaseCoordinatorIntegrationTest.java | 2 +- .../dynamodb/DynamoDBLeaseRefresherTest.java | 4 +- ...llingModePayPerRequestIntegrationTest.java | 281 ++++++++++++++++++ 9 files changed, 357 insertions(+), 5 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 769bf3ad..37496afa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -27,6 +27,23 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; */ public interface LeaseRefresher { + /** + * Creates the table that will store leases. Succeeds if table already exists. + * Deprecated. Use createLeaseTableIfNotExists(). + * + * @param readCapacity + * @param writeCapacity + * + * @return true if we created a new table (table didn't exist before) + * + * @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity + * restrictions. + * @throws DependencyException if DynamoDB createTable fails in an unexpected way + */ + @Deprecated + boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) + throws ProvisionedThroughputException, DependencyException; + /** * Creates the table that will store leases. Succeeds if table already exists. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 015d94e3..8290aefd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -155,6 +155,16 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { this.billingMode = billingMode; } + /** + * {@inheritDoc} + */ + @Override + public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) + throws ProvisionedThroughputException, DependencyException { + // DynamoDB is now created in PayPerRequest billing mode by default. Keeping this for backward compatibility. + return createLeaseTableIfNotExists(); + } + /** * {@inheritDoc} */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index d857eede..0d418ca3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -108,6 +108,12 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { } } + @Override + public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) + throws ProvisionedThroughputException, DependencyException { + return createLeaseTableIfNotExists(); + } + @Override public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java new file mode 100644 index 00000000..9f7735f9 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.leases; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Rule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.mockito.Mock; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; +import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; + +@Slf4j +public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest { + @Override + protected DynamoDBLeaseRefresher getLeaseRefresher() { + return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java index 3245be91..fd5106e4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java @@ -54,7 +54,7 @@ public class LeaseIntegrationTest { try { if (!leaseRefresher.leaseTableExists()) { log.info("Creating lease table"); - leaseRefresher.createLeaseTableIfNotExists(); + leaseRefresher.createLeaseTableIfNotExists(10L, 10L); leaseRefresher.waitUntilLeaseTableExists(10, 500); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index 102242bc..ce6ce386 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -111,7 +111,9 @@ public class ShardSyncTaskIntegrationTest { @Test public final void testCall() throws DependencyException, InvalidStateException, ProvisionedThroughputException { if (!leaseRefresher.leaseTableExists()) { - leaseRefresher.createLeaseTableIfNotExists(); + final Long readCapacity = 10L; + final Long writeCapacity = 10L; + leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity); } leaseRefresher.deleteAll(); Set shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 27400a3c..d89c010e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -78,7 +78,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(), useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); } - leaseRefresher.createLeaseTableIfNotExists(); + leaseRefresher.createLeaseTableIfNotExists(10L, 10L); int retryLeft = ATTEMPTS; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index cb212ca3..5188c41b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -275,7 +275,7 @@ public class DynamoDBLeaseRefresherTest { when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); - verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists()); + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); } @Test @@ -292,7 +292,7 @@ public class DynamoDBLeaseRefresherTest { when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); - verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists()); + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); } @FunctionalInterface diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java new file mode 100644 index 00000000..1dad013e --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java @@ -0,0 +1,281 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.leases.dynamodb; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseIntegrationBillingModePayPerRequestTest; +import software.amazon.kinesis.leases.LeaseRenewer; +import software.amazon.kinesis.leases.exceptions.LeasingException; +import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Executors; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends + LeaseIntegrationBillingModePayPerRequestTest { + private final String TEST_METRIC = "TestOperation"; + + // This test case's leases last 2 seconds + private static final long LEASE_DURATION_MILLIS = 2000L; + + private LeaseRenewer renewer; + + @Before + public void setup() { + renewer = new DynamoDBLeaseRenewer(leaseRefresher, "foo", LEASE_DURATION_MILLIS, + Executors.newCachedThreadPool(), new NullMetricsFactory()); + } + + @Test + public void testSimpleRenew() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + } + + @Test + public void testLeaseLoss() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").withLease("2", "foo").build(); + + builder.addLeasesToRenew(renewer, "1", "2"); + Lease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2"); + + // lose lease 2 + leaseRefresher.takeLease(renewedLease, "bar"); + + builder.renewMutateAssert(renewer, "1"); + } + + @Test + public void testClear() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + renewer.clearCurrentlyHeldLeases(); + builder.renewMutateAssert(renewer); + } + + @Test + public void testGetCurrentlyHeldLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + // this should be a copy that doesn't get updated + Lease lease = renewer.getCurrentlyHeldLease("1"); + assertThat(lease.leaseCounter(), equalTo(1L)); + + // do one renewal and make sure the old copy doesn't get updated + builder.renewMutateAssert(renewer, "1"); + + assertThat(lease.leaseCounter(), equalTo(1L)); + } + + @Test + public void testGetCurrentlyHeldLeases() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").withLease("2", "foo").build(); + builder.addLeasesToRenew(renewer, "1", "2"); + Lease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2"); + + // This should be a copy that doesn't get updated + Map heldLeases = renewer.getCurrentlyHeldLeases(); + assertThat(heldLeases.size(), equalTo(2)); + assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L)); + assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L)); + + // lose lease 2 + leaseRefresher.takeLease(lease2, "bar"); + + // Do another renewal and make sure the copy doesn't change + builder.renewMutateAssert(renewer, "1"); + + assertThat(heldLeases.size(), equalTo(2)); + assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L)); + assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L)); + } + + @Test + public void testUpdateLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease expected = renewer.getCurrentlyHeldLease("1"); + expected.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + assertThat(renewer.updateLease(expected, expected.concurrencyToken(), TEST_METRIC, expected.leaseKey()), + equalTo(true)); + + // Assert that the counter and data have changed immediately after the update... + Lease actual = renewer.getCurrentlyHeldLease("1"); + expected.leaseCounter(expected.leaseCounter() + 1); + assertThat(actual, equalTo(expected)); + + // ...and after another round of renewal + renewer.renewLeases(); + actual = renewer.getCurrentlyHeldLease("1"); + expected.leaseCounter(expected.leaseCounter() + 1); + assertThat(actual, equalTo(expected)); + } + + @Test + public void testUpdateLostLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease lease = renewer.getCurrentlyHeldLease("1"); + + // cause lease loss such that the renewer doesn't realize he's lost the lease when update is called + leaseRefresher.renewLease(lease); + + // renewer still thinks he has the lease + assertThat(renewer.getCurrentlyHeldLease("1"), notNullValue()); + lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + + // update fails + assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, null), equalTo(false)); + // renewer no longer thinks he has the lease + assertThat(renewer.getCurrentlyHeldLease("1"), nullValue()); + } + + @Test + public void testUpdateOldLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease lease = renewer.getCurrentlyHeldLease("1"); + + // cause lease loss such that the renewer knows the lease has been lost when update is called + leaseRefresher.takeLease(lease, "bar"); + builder.renewMutateAssert(renewer); + + lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false)); + } + + @Test + public void testUpdateRegainedLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease lease = renewer.getCurrentlyHeldLease("1"); + + // cause lease loss such that the renewer knows the lease has been lost when update is called + leaseRefresher.takeLease(lease, "bar"); + builder.renewMutateAssert(renewer); + + // regain the lease + builder.addLeasesToRenew(renewer, "1"); + + lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false)); + } + + @Test + public void testIgnoreNoRenewalTimestamp() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + Lease lease = builder.withLease("1", "foo").build().get("1"); + lease.lastCounterIncrementNanos(null); + + renewer.addLeasesToRenew(Collections.singleton(lease)); + + assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0)); + } + + @Test + public void testLeaseTimeout() throws LeasingException, InterruptedException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + // TODO: Worth eliminating this sleep using the same pattern we used on LeaseTaker? + Thread.sleep(LEASE_DURATION_MILLIS); // Wait for the lease to timeout + + assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0)); + } + + @Test + public void testInitialize() throws LeasingException { + final String shardId = "shd-0-0"; + final String owner = "foo:8000"; + + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + builder.withLease(shardId, owner); + Map leases = builder.build(); + DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L, + Executors.newCachedThreadPool(), new NullMetricsFactory()); + renewer.initialize(); + Map heldLeases = renewer.getCurrentlyHeldLeases(); + assertThat(heldLeases.size(), equalTo(leases.size())); + assertThat(heldLeases.keySet(), equalTo(leases.keySet())); + } + + @Test + public void testInitializeBillingMode() throws LeasingException { + final String shardId = "shd-0-0"; + final String owner = "foo:8000"; + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + builder.withLease(shardId, owner); + Map leases = builder.build(); + DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L, + Executors.newCachedThreadPool(), new NullMetricsFactory()); + renewer.initialize(); + Map heldLeases = renewer.getCurrentlyHeldLeases(); + assertThat(heldLeases.size(), equalTo(leases.size())); + assertThat(heldLeases.keySet(), equalTo(leases.keySet())); + } +}