diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index dd18ec2f..89e6a3bf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -196,7 +196,7 @@ public class LeaseManagementConfig { private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT; - private BillingMode billingMode = BillingMode.PROVISIONED; + private BillingMode billingMode = BillingMode.PAY_PER_REQUEST; /** * Frequency (in millis) of the auditor job to scan for partial leases in the lease table. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index b7f38a4e..769bf3ad 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -30,16 +30,13 @@ public interface LeaseRefresher { /** * Creates the table that will store leases. Succeeds if table already exists. * - * @param readCapacity - * @param writeCapacity - * * @return true if we created a new table (table didn't exist before) * * @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity * restrictions. * @throws DependencyException if DynamoDB createTable fails in an unexpected way */ - boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) + boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException; /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 78673f66..76f91800 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -211,10 +211,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { @Override public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException { final boolean newTableCreated = - leaseRefresher.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity); + leaseRefresher.createLeaseTableIfNotExists(); if (newTableCreated) { - log.info("Created new lease table for coordinator with initial read capacity of {} and write capacity of {}.", - initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity); + log.info("Created new lease table for coordinator with pay per request billing mode."); } // Need to wait for table in active state. final long secondsBetweenPolls = 10L; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 5102bc5e..ad1a2300 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -285,7 +285,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 8002eacc..015d94e3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -38,7 +38,6 @@ import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; -import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; @@ -130,7 +129,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, final LeaseSerializer serializer, final boolean consistentReads, @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { - this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST); } /** @@ -160,7 +159,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { * {@inheritDoc} */ @Override - public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) + public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException { try { if (tableStatus() != null) { @@ -172,19 +171,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { // log.error("Failed to get table status for {}", table, de); } - ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) - .writeCapacityUnits(writeCapacity).build(); - final CreateTableRequest request; - if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){ - request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()) - .billingMode(billingMode).build(); - }else{ - request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput) - .build(); - } - + final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()) + .billingMode(billingMode).build(); final AWSExceptionManager exceptionManager = createExceptionManager(); exceptionManager.add(ResourceInUseException.class, t -> t); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 81a49839..d857eede 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -109,12 +109,12 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { } @Override - public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) + public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException { throwExceptions("createLeaseTableIfNotExists", ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS); - return leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity); + return leaseRefresher.createLeaseTableIfNotExists(); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 2de34649..186fe290 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -68,8 +68,7 @@ public class LeaseCoordinatorExerciser { LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient, new DynamoDBLeaseSerializer(), true, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); - if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY, - INITIAL_LEASE_TABLE_WRITE_CAPACITY)) { + if (leaseRefresher.createLeaseTableIfNotExists()) { log.info("Waiting for newly created lease table"); if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) { log.error("Table was not created in time"); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java deleted file mode 100644 index dd069b19..00000000 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package software.amazon.kinesis.leases; - -import lombok.extern.slf4j.Slf4j; -import org.junit.Rule; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.mockito.Mock; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.model.BillingMode; -import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; -import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; -import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; - -@Slf4j -public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest { - @Override - protected DynamoDBLeaseRefresher getLeaseRefresher() { - return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true, - tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); - } -} - diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java index 21d4e932..3245be91 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java @@ -54,7 +54,7 @@ public class LeaseIntegrationTest { try { if (!leaseRefresher.leaseTableExists()) { log.info("Creating lease table"); - leaseRefresher.createLeaseTableIfNotExists(10L, 10L); + leaseRefresher.createLeaseTableIfNotExists(); leaseRefresher.waitUntilLeaseTableExists(10, 500); } @@ -74,7 +74,7 @@ public class LeaseIntegrationTest { protected DynamoDBLeaseRefresher getLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true, - tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index ce6ce386..102242bc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -111,9 +111,7 @@ public class ShardSyncTaskIntegrationTest { @Test public final void testCall() throws DependencyException, InvalidStateException, ProvisionedThroughputException { if (!leaseRefresher.leaseTableExists()) { - final Long readCapacity = 10L; - final Long writeCapacity = 10L; - leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity); + leaseRefresher.createLeaseTableIfNotExists(); } leaseRefresher.deleteAll(); Set shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index d89c010e..27400a3c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -78,7 +78,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(), useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); } - leaseRefresher.createLeaseTableIfNotExists(10L, 10L); + leaseRefresher.createLeaseTableIfNotExists(); int retryLeft = ATTEMPTS; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 5188c41b..cb212ca3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -275,7 +275,7 @@ public class DynamoDBLeaseRefresherTest { when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); - verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists()); } @Test @@ -292,7 +292,7 @@ public class DynamoDBLeaseRefresherTest { when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); - verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists()); } @FunctionalInterface diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java deleted file mode 100644 index 1dad013e..00000000 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package software.amazon.kinesis.leases.dynamodb; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.kinesis.leases.Lease; -import software.amazon.kinesis.leases.LeaseIntegrationBillingModePayPerRequestTest; -import software.amazon.kinesis.leases.LeaseRenewer; -import software.amazon.kinesis.leases.exceptions.LeasingException; -import software.amazon.kinesis.metrics.NullMetricsFactory; -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Executors; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.junit.Assert.assertThat; - -@RunWith(MockitoJUnitRunner.class) -public class DynamoDBLeaseRenewerBillingModePayPerRequestIntegrationTest extends - LeaseIntegrationBillingModePayPerRequestTest { - private final String TEST_METRIC = "TestOperation"; - - // This test case's leases last 2 seconds - private static final long LEASE_DURATION_MILLIS = 2000L; - - private LeaseRenewer renewer; - - @Before - public void setup() { - renewer = new DynamoDBLeaseRenewer(leaseRefresher, "foo", LEASE_DURATION_MILLIS, - Executors.newCachedThreadPool(), new NullMetricsFactory()); - } - - @Test - public void testSimpleRenew() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - } - - @Test - public void testLeaseLoss() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").withLease("2", "foo").build(); - - builder.addLeasesToRenew(renewer, "1", "2"); - Lease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2"); - - // lose lease 2 - leaseRefresher.takeLease(renewedLease, "bar"); - - builder.renewMutateAssert(renewer, "1"); - } - - @Test - public void testClear() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - - renewer.clearCurrentlyHeldLeases(); - builder.renewMutateAssert(renewer); - } - - @Test - public void testGetCurrentlyHeldLease() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - - // this should be a copy that doesn't get updated - Lease lease = renewer.getCurrentlyHeldLease("1"); - assertThat(lease.leaseCounter(), equalTo(1L)); - - // do one renewal and make sure the old copy doesn't get updated - builder.renewMutateAssert(renewer, "1"); - - assertThat(lease.leaseCounter(), equalTo(1L)); - } - - @Test - public void testGetCurrentlyHeldLeases() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").withLease("2", "foo").build(); - builder.addLeasesToRenew(renewer, "1", "2"); - Lease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2"); - - // This should be a copy that doesn't get updated - Map heldLeases = renewer.getCurrentlyHeldLeases(); - assertThat(heldLeases.size(), equalTo(2)); - assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L)); - assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L)); - - // lose lease 2 - leaseRefresher.takeLease(lease2, "bar"); - - // Do another renewal and make sure the copy doesn't change - builder.renewMutateAssert(renewer, "1"); - - assertThat(heldLeases.size(), equalTo(2)); - assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L)); - assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L)); - } - - @Test - public void testUpdateLease() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - - Lease expected = renewer.getCurrentlyHeldLease("1"); - expected.checkpoint(new ExtendedSequenceNumber("new checkpoint")); - assertThat(renewer.updateLease(expected, expected.concurrencyToken(), TEST_METRIC, expected.leaseKey()), - equalTo(true)); - - // Assert that the counter and data have changed immediately after the update... - Lease actual = renewer.getCurrentlyHeldLease("1"); - expected.leaseCounter(expected.leaseCounter() + 1); - assertThat(actual, equalTo(expected)); - - // ...and after another round of renewal - renewer.renewLeases(); - actual = renewer.getCurrentlyHeldLease("1"); - expected.leaseCounter(expected.leaseCounter() + 1); - assertThat(actual, equalTo(expected)); - } - - @Test - public void testUpdateLostLease() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - - Lease lease = renewer.getCurrentlyHeldLease("1"); - - // cause lease loss such that the renewer doesn't realize he's lost the lease when update is called - leaseRefresher.renewLease(lease); - - // renewer still thinks he has the lease - assertThat(renewer.getCurrentlyHeldLease("1"), notNullValue()); - lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); - - // update fails - assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, null), equalTo(false)); - // renewer no longer thinks he has the lease - assertThat(renewer.getCurrentlyHeldLease("1"), nullValue()); - } - - @Test - public void testUpdateOldLease() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - - Lease lease = renewer.getCurrentlyHeldLease("1"); - - // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseRefresher.takeLease(lease, "bar"); - builder.renewMutateAssert(renewer); - - lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); - assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false)); - } - - @Test - public void testUpdateRegainedLease() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - - Lease lease = renewer.getCurrentlyHeldLease("1"); - - // cause lease loss such that the renewer knows the lease has been lost when update is called - leaseRefresher.takeLease(lease, "bar"); - builder.renewMutateAssert(renewer); - - // regain the lease - builder.addLeasesToRenew(renewer, "1"); - - lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); - assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false)); - } - - @Test - public void testIgnoreNoRenewalTimestamp() throws LeasingException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - Lease lease = builder.withLease("1", "foo").build().get("1"); - lease.lastCounterIncrementNanos(null); - - renewer.addLeasesToRenew(Collections.singleton(lease)); - - assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0)); - } - - @Test - public void testLeaseTimeout() throws LeasingException, InterruptedException { - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - - builder.withLease("1", "foo").build(); - - builder.addLeasesToRenew(renewer, "1"); - builder.renewMutateAssert(renewer, "1"); - - // TODO: Worth eliminating this sleep using the same pattern we used on LeaseTaker? - Thread.sleep(LEASE_DURATION_MILLIS); // Wait for the lease to timeout - - assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0)); - } - - @Test - public void testInitialize() throws LeasingException { - final String shardId = "shd-0-0"; - final String owner = "foo:8000"; - - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - builder.withLease(shardId, owner); - Map leases = builder.build(); - DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L, - Executors.newCachedThreadPool(), new NullMetricsFactory()); - renewer.initialize(); - Map heldLeases = renewer.getCurrentlyHeldLeases(); - assertThat(heldLeases.size(), equalTo(leases.size())); - assertThat(heldLeases.keySet(), equalTo(leases.keySet())); - } - - @Test - public void testInitializeBillingMode() throws LeasingException { - final String shardId = "shd-0-0"; - final String owner = "foo:8000"; - TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); - builder.withLease(shardId, owner); - Map leases = builder.build(); - DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L, - Executors.newCachedThreadPool(), new NullMetricsFactory()); - renewer.initialize(); - Map heldLeases = renewer.getCurrentlyHeldLeases(); - assertThat(heldLeases.size(), equalTo(leases.size())); - assertThat(heldLeases.keySet(), equalTo(leases.keySet())); - } -}