diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index a5f088ce..20e0aa8f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -28,6 +28,7 @@ import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -165,6 +166,8 @@ public class LeaseManagementConfig { private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT; + private BillingMode billingMode = BillingMode.PROVISIONED; + /** * The initial position for getting records from Kinesis streams. * @@ -267,7 +270,7 @@ public class LeaseManagementConfig { initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), hierarchicalShardSyncer(), - tableCreatorCallback(), dynamoDbRequestTimeout()); + tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index c773aab2..c2ade429 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -75,6 +76,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final long initialLeaseTableWriteCapacity; private final TableCreatorCallback tableCreatorCallback; private final Duration dynamoDbRequestTimeout; + private final BillingMode billingMode; /** * Constructor. @@ -254,6 +256,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param tableCreatorCallback * @param dynamoDbRequestTimeout */ + @Deprecated public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, @@ -266,6 +269,58 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { + this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, + initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, BillingMode billingMode) { this.kinesisClient = kinesisClient; this.streamName = streamName; this.dynamoDBClient = dynamoDBClient; @@ -292,6 +347,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; + this.billingMode = billingMode; } @Override @@ -324,7 +380,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads, - tableCreatorCallback, dynamoDbRequestTimeout); + tableCreatorCallback, dynamoDbRequestTimeout, billingMode); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 3520ed83..1c464afe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -25,26 +25,7 @@ import java.util.concurrent.TimeoutException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; -import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; -import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; -import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; -import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; -import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; -import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; -import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; -import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; -import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; -import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; -import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; -import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; -import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; -import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; -import software.amazon.awssdk.services.dynamodb.model.ScanRequest; -import software.amazon.awssdk.services.dynamodb.model.ScanResponse; -import software.amazon.awssdk.services.dynamodb.model.TableStatus; -import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.*; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; @@ -57,6 +38,7 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; /** * An implementation of {@link LeaseRefresher} that uses DynamoDB. @@ -72,6 +54,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private final TableCreatorCallback tableCreatorCallback; private final Duration dynamoDbRequestTimeout; + private final BillingMode billingMode; private boolean newTableCreated = false; @@ -111,22 +94,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { /** * Constructor. - * @param table + * @param table * @param dynamoDBClient * @param serializer * @param consistentReads * @param tableCreatorCallback * @param dynamoDbRequestTimeout */ + @Deprecated public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, final LeaseSerializer serializer, final boolean consistentReads, @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + } + + /** + * Constructor. + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + */ + public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, + final BillingMode billingMode) { this.table = table; this.dynamoDBClient = dynamoDBClient; this.serializer = serializer; this.consistentReads = consistentReads; this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; + this.billingMode = billingMode; } /** @@ -147,8 +149,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) .writeCapacityUnits(writeCapacity).build(); - CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) - .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput).build(); + final CreateTableRequest request; + if(BillingMode.PAY_PER_REQUEST.equals(billingMode)){ + request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()) + .billingMode(billingMode).build(); + }else{ + request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput) + .billingMode(billingMode).build(); + } + final AWSExceptionManager exceptionManager = createExceptionManager(); exceptionManager.add(ResourceInUseException.class, t -> t); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java new file mode 100644 index 00000000..dd069b19 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.leases; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Rule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.mockito.Mock; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; +import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; + +@Slf4j +public class LeaseIntegrationBillingModePayPerRequestTest extends LeaseIntegrationTest { + @Override + protected DynamoDBLeaseRefresher getLeaseRefresher() { + return new DynamoDBLeaseRefresher(tableName+"Per-Request", ddbClient, leaseSerializer, true, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); + } +} + diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java index faeaebee..21d4e932 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java @@ -22,13 +22,14 @@ import org.mockito.Mock; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; @Slf4j public class LeaseIntegrationTest { - private LeaseSerializer leaseSerializer = new DynamoDBLeaseSerializer(); + protected LeaseSerializer leaseSerializer = new DynamoDBLeaseSerializer(); protected static DynamoDBLeaseRefresher leaseRefresher; protected static DynamoDbAsyncClient ddbClient = DynamoDbAsyncClient.builder() @@ -47,8 +48,7 @@ public class LeaseIntegrationTest { if (leaseRefresher == null) { // Do some static setup once per class. - leaseRefresher = new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true, - tableCreatorCallback); + leaseRefresher = getLeaseRefresher(); } try { @@ -72,5 +72,10 @@ public class LeaseIntegrationTest { } }; + protected DynamoDBLeaseRefresher getLeaseRefresher() { + return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); + } + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 5d5c7aa4..3e314347 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -56,10 +56,12 @@ import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; + import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.exceptions.DependencyException; - @RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseRefresherTest { @@ -232,6 +234,23 @@ public class DynamoDBLeaseRefresherTest { verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); } + @Test + public void testCreateLeaseTableBillingMode() throws Exception { + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); + + TimeoutException te = setRuleForDependencyTimeout(); + + when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(anyLong(), any())) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + + when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); + + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); + } + @FunctionalInterface private interface TestCaller { void call() throws Exception; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationBillingModePayPerRequestTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationBillingModePayPerRequestTest.java new file mode 100644 index 00000000..03df22cd --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationBillingModePayPerRequestTest.java @@ -0,0 +1,281 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.leases.dynamodb; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseIntegrationBillingModePayPerRequestTest; +import software.amazon.kinesis.leases.LeaseRenewer; +import software.amazon.kinesis.leases.exceptions.LeasingException; +import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Executors; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class DynamoDBLeaseRenewerIntegrationBillingModePayPerRequestTest extends + LeaseIntegrationBillingModePayPerRequestTest { + private final String TEST_METRIC = "TestOperation"; + + // This test case's leases last 2 seconds + private static final long LEASE_DURATION_MILLIS = 2000L; + + private LeaseRenewer renewer; + + @Before + public void setup() { + renewer = new DynamoDBLeaseRenewer(leaseRefresher, "foo", LEASE_DURATION_MILLIS, + Executors.newCachedThreadPool(), new NullMetricsFactory()); + } + + @Test + public void testSimpleRenew() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + } + + @Test + public void testLeaseLoss() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").withLease("2", "foo").build(); + + builder.addLeasesToRenew(renewer, "1", "2"); + Lease renewedLease = builder.renewMutateAssert(renewer, "1", "2").get("2"); + + // lose lease 2 + leaseRefresher.takeLease(renewedLease, "bar"); + + builder.renewMutateAssert(renewer, "1"); + } + + @Test + public void testClear() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + renewer.clearCurrentlyHeldLeases(); + builder.renewMutateAssert(renewer); + } + + @Test + public void testGetCurrentlyHeldLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + // this should be a copy that doesn't get updated + Lease lease = renewer.getCurrentlyHeldLease("1"); + assertThat(lease.leaseCounter(), equalTo(1L)); + + // do one renewal and make sure the old copy doesn't get updated + builder.renewMutateAssert(renewer, "1"); + + assertThat(lease.leaseCounter(), equalTo(1L)); + } + + @Test + public void testGetCurrentlyHeldLeases() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").withLease("2", "foo").build(); + builder.addLeasesToRenew(renewer, "1", "2"); + Lease lease2 = builder.renewMutateAssert(renewer, "1", "2").get("2"); + + // This should be a copy that doesn't get updated + Map heldLeases = renewer.getCurrentlyHeldLeases(); + assertThat(heldLeases.size(), equalTo(2)); + assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L)); + assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L)); + + // lose lease 2 + leaseRefresher.takeLease(lease2, "bar"); + + // Do another renewal and make sure the copy doesn't change + builder.renewMutateAssert(renewer, "1"); + + assertThat(heldLeases.size(), equalTo(2)); + assertThat(heldLeases.get("1").leaseCounter(), equalTo(1L)); + assertThat(heldLeases.get("2").leaseCounter(), equalTo(1L)); + } + + @Test + public void testUpdateLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease expected = renewer.getCurrentlyHeldLease("1"); + expected.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + assertThat(renewer.updateLease(expected, expected.concurrencyToken(), TEST_METRIC, expected.leaseKey()), + equalTo(true)); + + // Assert that the counter and data have changed immediately after the update... + Lease actual = renewer.getCurrentlyHeldLease("1"); + expected.leaseCounter(expected.leaseCounter() + 1); + assertThat(actual, equalTo(expected)); + + // ...and after another round of renewal + renewer.renewLeases(); + actual = renewer.getCurrentlyHeldLease("1"); + expected.leaseCounter(expected.leaseCounter() + 1); + assertThat(actual, equalTo(expected)); + } + + @Test + public void testUpdateLostLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease lease = renewer.getCurrentlyHeldLease("1"); + + // cause lease loss such that the renewer doesn't realize he's lost the lease when update is called + leaseRefresher.renewLease(lease); + + // renewer still thinks he has the lease + assertThat(renewer.getCurrentlyHeldLease("1"), notNullValue()); + lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + + // update fails + assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, null), equalTo(false)); + // renewer no longer thinks he has the lease + assertThat(renewer.getCurrentlyHeldLease("1"), nullValue()); + } + + @Test + public void testUpdateOldLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease lease = renewer.getCurrentlyHeldLease("1"); + + // cause lease loss such that the renewer knows the lease has been lost when update is called + leaseRefresher.takeLease(lease, "bar"); + builder.renewMutateAssert(renewer); + + lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false)); + } + + @Test + public void testUpdateRegainedLease() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + Lease lease = renewer.getCurrentlyHeldLease("1"); + + // cause lease loss such that the renewer knows the lease has been lost when update is called + leaseRefresher.takeLease(lease, "bar"); + builder.renewMutateAssert(renewer); + + // regain the lease + builder.addLeasesToRenew(renewer, "1"); + + lease.checkpoint(new ExtendedSequenceNumber("new checkpoint")); + assertThat(renewer.updateLease(lease, lease.concurrencyToken(), TEST_METRIC, lease.leaseKey()), equalTo(false)); + } + + @Test + public void testIgnoreNoRenewalTimestamp() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + Lease lease = builder.withLease("1", "foo").build().get("1"); + lease.lastCounterIncrementNanos(null); + + renewer.addLeasesToRenew(Collections.singleton(lease)); + + assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0)); + } + + @Test + public void testLeaseTimeout() throws LeasingException, InterruptedException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "foo").build(); + + builder.addLeasesToRenew(renewer, "1"); + builder.renewMutateAssert(renewer, "1"); + + // TODO: Worth eliminating this sleep using the same pattern we used on LeaseTaker? + Thread.sleep(LEASE_DURATION_MILLIS); // Wait for the lease to timeout + + assertThat(renewer.getCurrentlyHeldLeases().size(), equalTo(0)); + } + + @Test + public void testInitialize() throws LeasingException { + final String shardId = "shd-0-0"; + final String owner = "foo:8000"; + + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + builder.withLease(shardId, owner); + Map leases = builder.build(); + DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L, + Executors.newCachedThreadPool(), new NullMetricsFactory()); + renewer.initialize(); + Map heldLeases = renewer.getCurrentlyHeldLeases(); + assertThat(heldLeases.size(), equalTo(leases.size())); + assertThat(heldLeases.keySet(), equalTo(leases.keySet())); + } + + @Test + public void testInitializeBillingMode() throws LeasingException { + final String shardId = "shd-0-0"; + final String owner = "foo:8000"; + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + builder.withLease(shardId, owner); + Map leases = builder.build(); + DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L, + Executors.newCachedThreadPool(), new NullMetricsFactory()); + renewer.initialize(); + Map heldLeases = renewer.getCurrentlyHeldLeases(); + assertThat(heldLeases.size(), equalTo(leases.size())); + assertThat(heldLeases.keySet(), equalTo(leases.keySet())); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java index 5231a13e..7c884fd6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerIntegrationTest.java @@ -262,4 +262,19 @@ public class DynamoDBLeaseRenewerIntegrationTest extends LeaseIntegrationTest { assertThat(heldLeases.size(), equalTo(leases.size())); assertThat(heldLeases.keySet(), equalTo(leases.keySet())); } + + @Test + public void testInitializeBillingMode() throws LeasingException { + final String shardId = "shd-0-0"; + final String owner = "foo:8000"; + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + builder.withLease(shardId, owner); + Map leases = builder.build(); + DynamoDBLeaseRenewer renewer = new DynamoDBLeaseRenewer(leaseRefresher, owner, 30000L, + Executors.newCachedThreadPool(), new NullMetricsFactory()); + renewer.initialize(); + Map heldLeases = renewer.getCurrentlyHeldLeases(); + assertThat(heldLeases.size(), equalTo(leases.size())); + assertThat(heldLeases.keySet(), equalTo(leases.keySet())); + } }