From 2524ef83c3dc993dc15e82b35ff2c3c7106c566e Mon Sep 17 00:00:00 2001 From: Lucien Luc Date: Thu, 31 Oct 2024 15:31:07 -0700 Subject: [PATCH] Fix DDBLeaseManagementFactory constructor javadoc and remove deprecated annotation. Re-add missing unit tests from merge. --- .../DynamoDBLeaseManagementFactory.java | 4 +- .../dynamodb/DynamoDBLeaseRefresherTest.java | 170 +++++++++++++----- .../dynamodb/DynamoDBLeaseRenewerTest.java | 37 +++- 3 files changed, 163 insertions(+), 48 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 1d6923d9..412dca32 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -246,12 +246,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param dynamoDbRequestTimeout * @param billingMode * @param leaseTableDeletionProtectionEnabled + * @param leaseTablePitrEnabled * @param leaseSerializer * @param customShardDetectorProvider * @param isMultiStreamMode * @param leaseCleanupConfig + * @param workerUtilizationAwareAssignmentConfig + * @param gracefulLeaseHandoffConfig */ - @Deprecated public DynamoDBLeaseManagementFactory( final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 270fca84..afdb346c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -10,7 +10,6 @@ import java.util.concurrent.Executors; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.google.common.collect.ImmutableMap; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -25,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsResponse; import software.amazon.kinesis.common.DdbTableConfig; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; @@ -53,20 +54,61 @@ import static software.amazon.kinesis.leases.dynamodb.TableCreatorCallback.NOOP_ class DynamoDBLeaseRefresherTest { private static final String TEST_LEASE_TABLE = "SomeTable"; - private DynamoDBLeaseRefresher leaseRefresher; private final DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDBEmbedded.create().dynamoDbAsyncClient(); - @BeforeEach - void setup() throws ProvisionedThroughputException, DependencyException { - this.leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + @Test + void createLeaseTableWithPitr() throws DependencyException, ProvisionedThroughputException { + // DynamoDBLocal does not support PITR operations on table so using mocks + final DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class, Mockito.RETURNS_MOCKS); + DynamoDBLeaseRefresher dynamoDBLeaseRefresherWithPitr = + createLeaseRefresher(new DdbTableConfig(), mockDdbClient, false, true); - this.leaseRefresher.createLeaseTableIfNotExists(); - this.leaseRefresher.waitUntilLeaseTableExists(1, 30); + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenThrow(ResourceNotFoundException.builder() + .message("Mock table does not exist scenario") + .build()); + + final CompletableFuture future = new CompletableFuture<>(); + future.complete(UpdateContinuousBackupsResponse.builder().build()); + + when(mockDdbClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class))) + .thenReturn(future); + + dynamoDBLeaseRefresherWithPitr.createLeaseTableIfNotExists(); + dynamoDBLeaseRefresherWithPitr.waitUntilLeaseTableExists(1, 30); + + UpdateContinuousBackupsRequest updateContinuousBackupsRequest = UpdateContinuousBackupsRequest.builder() + .tableName(TEST_LEASE_TABLE) + .pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true)) + .build(); + + verify(mockDdbClient, times(1)).updateContinuousBackups(updateContinuousBackupsRequest); } @Test - void createWorkerIdToLeaseKeyIndexIfNotExists_sanity() throws DependencyException { + void createLeaseTableWithDeletionProtection() throws DependencyException, ProvisionedThroughputException { + DynamoDBLeaseRefresher dynamoDBLeaseRefresherWithDeletionProtection = + createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient, true, false); + + dynamoDBLeaseRefresherWithDeletionProtection.createLeaseTableIfNotExists(); + dynamoDBLeaseRefresherWithDeletionProtection.waitUntilLeaseTableExists(1, 30); + + final DescribeTableResponse describeTableResponse = dynamoDbAsyncClient + .describeTable(DescribeTableRequest.builder() + .tableName(TEST_LEASE_TABLE) + .build()) + .join(); + + assertTrue(describeTableResponse.table().deletionProtectionEnabled()); + } + + @Test + void createWorkerIdToLeaseKeyIndexIfNotExists_sanity() throws DependencyException, ProvisionedThroughputException { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + leaseRefresher.createLeaseTableIfNotExists(); + leaseRefresher.waitUntilLeaseTableExists(1, 30); + assertFalse(leaseRefresher.isLeaseOwnerToLeaseKeyIndexActive()); final String creationResponse = leaseRefresher.createLeaseOwnerToLeaseKeyIndexIfNotExists(); @@ -95,10 +137,13 @@ class DynamoDBLeaseRefresherTest { describeTableResponse.table().globalSecondaryIndexes().get(0).indexStatus()); } - // TODO Add CreateLeaseTablePITR test and CreateLeaseTableDeletionProtection test - @Test - void waitUntilLeaseOwnerToLeaseKeyIndexExists_noTransitionToActive_assertFalse() throws DependencyException { + void waitUntilLeaseOwnerToLeaseKeyIndexExists_noTransitionToActive_assertFalse() + throws DependencyException, ProvisionedThroughputException { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + leaseRefresher.createLeaseTableIfNotExists(); + leaseRefresher.waitUntilLeaseTableExists(1, 30); + dynamoDbAsyncClient.deleteTable( DeleteTableRequest.builder().tableName(TEST_LEASE_TABLE).build()); @@ -108,7 +153,11 @@ class DynamoDBLeaseRefresherTest { } @Test - void isLeaseOwnerGsiIndexActive() throws DependencyException { + void isLeaseOwnerGsiIndexActive() throws DependencyException, ProvisionedThroughputException { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + leaseRefresher.createLeaseTableIfNotExists(); + leaseRefresher.waitUntilLeaseTableExists(1, 30); + final DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class, Mockito.RETURNS_MOCKS); final LeaseRefresher leaseRefresherForTest = new DynamoDBLeaseRefresher( TEST_LEASE_TABLE, @@ -210,7 +259,8 @@ class DynamoDBLeaseRefresherTest { @Test void assignLease_leaseWithPrevOwner_assertAssignmentToNewOwner() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); // Fetch a lease from assign it to owner2 @@ -222,7 +272,8 @@ class DynamoDBLeaseRefresherTest { @Test void assignLease_unassignedLease_assertAssignmentToNewOwner() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", null)); // Fetch a lease from assign it to owner2 @@ -235,7 +286,8 @@ class DynamoDBLeaseRefresherTest { @Test void assignLease_unAssignedLeaseGetsDeleted_assertAssignemntFailure() throws ProvisionedThroughputException, InvalidStateException, DependencyException { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", null)); // Lease fetched before delete @@ -255,7 +307,8 @@ class DynamoDBLeaseRefresherTest { @Test void assignLease_AssignedLeaseGetsDeleted_assertAssignemntFailure() throws ProvisionedThroughputException, InvalidStateException, DependencyException { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); // Lease fetched before delete @@ -279,7 +332,8 @@ class DynamoDBLeaseRefresherTest { @Test void assignLease_updatesOnTheLeaseFailsAfterAssignment() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final Lease originalLeaseOnWorker = createDummyLease("lease1", "leaseOwner1"); leaseRefresher.createLeaseIfNotExists(originalLeaseOnWorker); @@ -318,7 +372,8 @@ class DynamoDBLeaseRefresherTest { @Test void listLeasesParallely_sanity() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2")); final Map.Entry, List> response = @@ -330,7 +385,8 @@ class DynamoDBLeaseRefresherTest { @Test void listLeasesParallely_leaseWithFailingDeserialization_assertCorrectResponse() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); createAndPutBadLeaseEntryInTable(); final Map.Entry, List> response = @@ -343,7 +399,8 @@ class DynamoDBLeaseRefresherTest { @Test void initiateGracefulLeaseHandoff_sanity() throws Exception { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final String nextOwner = "nextOwner"; final String currentOwner = "currentOwner"; final Lease lease = createDummyLease("lease1", currentOwner); @@ -357,7 +414,8 @@ class DynamoDBLeaseRefresherTest { @Test void initiateGracefulLeaseHandoff_conditionalFailure() throws Exception { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final String nextOwner = "nextOwner"; final String currentOwner = "currentOwner"; final Lease lease = createDummyLease("lease1", currentOwner); @@ -369,7 +427,9 @@ class DynamoDBLeaseRefresherTest { @Test void renewLease_testGracefulShutdown_updateLeaseWhenDetectedShutdown() throws Exception { - leaseRefresher = spy(leaseRefresher); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + DynamoDBLeaseRefresher leaseRefresherSpy = spy(leaseRefresher); + setupTable(leaseRefresher); final String nextOwner = "nextOwner"; final String currentOwner = "currentOwner"; final Lease lease = createDummyLease("lease1", currentOwner); @@ -379,24 +439,28 @@ class DynamoDBLeaseRefresherTest { lease.checkpointOwner(null); lease.leaseOwner(currentOwner); // renew should see that the lease has the shutdown attributes and so mark them on the passed-in lease. - assertTrue(leaseRefresher.renewLease(lease)); + assertTrue(leaseRefresherSpy.renewLease(lease)); assertEquals(currentOwner, lease.checkpointOwner()); assertEquals(nextOwner, lease.leaseOwner()); assertEquals(lease, leaseRefresher.getLease(lease.leaseKey())); - verify(leaseRefresher, times(2)).renewLease(lease); + verify(leaseRefresherSpy, times(2)).renewLease(lease); } @Test void renewLease_testGracefulShutdown_conditionalFailureDueToNoLeaseInDdb_NotTryingToRenew() throws Exception { - leaseRefresher = spy(leaseRefresher); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + DynamoDBLeaseRefresher leaseRefresherSpy = spy(leaseRefresher); + setupTable(leaseRefresher); final Lease lease = createDummyLease("lease1", "currentOwner"); - assertFalse(leaseRefresher.renewLease(lease)); - verify(leaseRefresher, times(1)).renewLease(lease); + assertFalse(leaseRefresherSpy.renewLease(lease)); + verify(leaseRefresherSpy, times(1)).renewLease(lease); } @Test void renewLease_testGracefulShutdown_remoteLeaseHasDifferentOwner_NotTryingToRenew() throws Exception { - leaseRefresher = spy(leaseRefresher); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); + DynamoDBLeaseRefresher leaseRefresherSpy = spy(leaseRefresher); final Lease lease = createDummyLease("lease1", "currentOwner"); final Lease originalLease = lease.copy(); leaseRefresher.createLeaseIfNotExists(lease); @@ -405,12 +469,14 @@ class DynamoDBLeaseRefresherTest { leaseRefresher.assignLease(lease, "nextOwner"); leaseRefresher.initiateGracefulLeaseHandoff(lease, "nextOwner2"); - assertFalse(leaseRefresher.renewLease(originalLease)); - verify(leaseRefresher, times(1)).renewLease(originalLease); + assertFalse(leaseRefresherSpy.renewLease(originalLease)); + verify(leaseRefresherSpy, times(1)).renewLease(originalLease); } @Test void renewLease_testGracefulShutdown_continueUpdateLeaseUntilLeaseIsTransferred() throws Exception { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final String nextOwner = "nextOwner"; final String currentOwner = "currentOwner"; final Lease lease = createDummyLease("lease1", currentOwner); @@ -435,7 +501,8 @@ class DynamoDBLeaseRefresherTest { @Test void assignLease_alwaysRemoveCheckpointOwner() throws Exception { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final String nextOwner = "nextOwner"; final String currentOwner = "currentOwner"; final Lease lease = createDummyLease("lease1", currentOwner); @@ -451,7 +518,8 @@ class DynamoDBLeaseRefresherTest { @Test void assignLease_conditionalFailureBecauseCheckpointOwnerIsNotExpected() throws Exception { - setupTable(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final String nextOwner = "nextOwner"; final String currentOwner = "currentOwner"; @@ -535,7 +603,7 @@ class DynamoDBLeaseRefresherTest { final LeaseRefresher leaseRefresher = createLeaseRefresher(createProvisionedTableConfig(), dbAsyncClient); // Creates base table and GSI - setupTable(leaseRefresher); + setupTableWithLeaseKeyIndex(leaseRefresher); final DescribeTableResponse describeTableResponse = dbAsyncClient .describeTable(DescribeTableRequest.builder() @@ -571,7 +639,7 @@ class DynamoDBLeaseRefresherTest { final LeaseRefresher leaseRefresher = createLeaseRefresher(createOnDemandTableConfig(), dbAsyncClient); // Creates base table and GSI - setupTable(leaseRefresher); + setupTableWithLeaseKeyIndex(leaseRefresher); final DescribeTableResponse describeTableResponse = dbAsyncClient .describeTable(DescribeTableRequest.builder() @@ -602,7 +670,9 @@ class DynamoDBLeaseRefresherTest { @Test public void takeLease_removesCheckpointOwner() throws Exception { - final Lease lease = createPendingCheckpointOwnerLease(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); + final Lease lease = createPendingCheckpointOwnerLease(leaseRefresher); assertTrue(leaseRefresher.takeLease(lease, "newOwner")); final Lease updatedLease = leaseRefresher.getLease(lease.leaseKey()); @@ -612,7 +682,9 @@ class DynamoDBLeaseRefresherTest { @Test public void evictLease_removesCheckpointOwner() throws Exception { - final Lease lease = createPendingCheckpointOwnerLease(); + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); + final Lease lease = createPendingCheckpointOwnerLease(leaseRefresher); final long originalCounter = lease.leaseCounter(); assertTrue(leaseRefresher.evictLease(lease)); @@ -625,6 +697,8 @@ class DynamoDBLeaseRefresherTest { @Test public void evictLease_removesOwnerIfCheckpointOwnerIsNull() throws Exception { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final Lease lease = createDummyLease("1", "ownerA"); final long originalCounter = lease.leaseCounter(); leaseRefresher.createLeaseIfNotExists(lease); @@ -639,6 +713,8 @@ class DynamoDBLeaseRefresherTest { @Test public void evictLease_noOpIfLeaseNotExists() throws Exception { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); final Lease lease = createDummyLease("1", "ownerA"); assertFalse(leaseRefresher.evictLease(lease)); @@ -648,7 +724,7 @@ class DynamoDBLeaseRefresherTest { assertFalse(leaseRefresher.evictLease(lease)); } - private Lease createPendingCheckpointOwnerLease() throws Exception { + private Lease createPendingCheckpointOwnerLease(final LeaseRefresher leaseRefresher) throws Exception { final Lease lease = createDummyLease("1", "ownerA"); lease.checkpointOwner("checkpointOwner"); leaseRefresher.createLeaseIfNotExists(lease); @@ -701,6 +777,14 @@ class DynamoDBLeaseRefresherTest { private DynamoDBLeaseRefresher createLeaseRefresher( final DdbTableConfig ddbTableConfig, final DynamoDbAsyncClient dynamoDbAsyncClient) { + return createLeaseRefresher(ddbTableConfig, dynamoDbAsyncClient, false, false); + } + + private DynamoDBLeaseRefresher createLeaseRefresher( + final DdbTableConfig ddbTableConfig, + final DynamoDbAsyncClient dynamoDbAsyncClient, + boolean deletionProtectionEnabled, + boolean pitrEnabled) { return new DynamoDBLeaseRefresher( TEST_LEASE_TABLE, dynamoDbAsyncClient, @@ -709,8 +793,8 @@ class DynamoDBLeaseRefresherTest { NOOP_TABLE_CREATOR_CALLBACK, Duration.ofSeconds(10), ddbTableConfig, - true, - false, + deletionProtectionEnabled, + pitrEnabled, new ArrayList<>()); } @@ -722,11 +806,13 @@ class DynamoDBLeaseRefresherTest { return lease; } - private void setupTable() throws ProvisionedThroughputException, DependencyException { - setupTable(leaseRefresher); + private void setupTable(final LeaseRefresher refresher) throws ProvisionedThroughputException, DependencyException { + refresher.createLeaseTableIfNotExists(); + refresher.waitUntilLeaseTableExists(1, 100); } - private void setupTable(final LeaseRefresher refresher) throws ProvisionedThroughputException, DependencyException { + private void setupTableWithLeaseKeyIndex(final LeaseRefresher refresher) + throws ProvisionedThroughputException, DependencyException { refresher.createLeaseTableIfNotExists(); refresher.waitUntilLeaseTableExists(1, 100); refresher.createLeaseOwnerToLeaseKeyIndexIfNotExists(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 6b7dc299..d6d0a5f6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -2,6 +2,7 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; import java.util.Map; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,12 +35,15 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static software.amazon.kinesis.leases.dynamodb.TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK; +import static software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber.LATEST; import static software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber.TRIM_HORIZON; class DynamoDBLeaseRenewerTest { @@ -125,7 +129,7 @@ class DynamoDBLeaseRenewerTest { @Test void renewLeases_enqueueShutdownRequestedLease_sanity() throws Exception { - createRenewer(); + createRenewer(leaseRefresher); final Lease lease = createDummyLease("key-1", WORKER_ID); leaseRefresher.createLeaseIfNotExists(lease); leaseRenewer.addLeasesToRenew(ImmutableList.of(lease)); @@ -186,7 +190,30 @@ class DynamoDBLeaseRenewerTest { assertTrue(leaseKeyToLeaseMap.containsKey("leaseKey4")); } - // TODO: add testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate + @Test + void testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate() throws Exception { + DynamoDBLeaseRefresher leaseRefresherMock = mock(DynamoDBLeaseRefresher.class, Mockito.RETURNS_MOCKS); + createRenewer(leaseRefresherMock); + + final String leaseKey = "leaseToUpdate"; + final Lease lease = createDummyLease(leaseKey, WORKER_ID); + leaseRenewer.addLeasesToRenew(ImmutableList.of(lease)); + final Lease updatedLease = createDummyLease(leaseKey, WORKER_ID); + updatedLease.checkpoint(LATEST); + + when(leaseRefresherMock.updateLease(updatedLease)).thenThrow(new DependencyException(new RuntimeException())); + try { + final UUID concurrencyToken = + leaseRenewer.getCurrentlyHeldLease(leaseKey).concurrencyToken(); + leaseRenewer.updateLease(updatedLease, concurrencyToken, "test", "dummyShardId"); + fail(); + } catch (DependencyException e) { + // expected + } + final Lease currentLease = leaseRenewer.getCurrentlyHeldLeases().get(leaseKey); + assertEquals(123L, currentLease.leaseCounter()); // leaseCounter should not be incremented due to DDB failure + assertEquals(TRIM_HORIZON, currentLease.checkpoint()); + } private void createAndPutBadLeaseEntryInTable() { final PutItemRequest putItemRequest = PutItemRequest.builder() @@ -198,7 +225,7 @@ class DynamoDBLeaseRenewerTest { dynamoDbAsyncClient.putItem(putItemRequest); } - private void createRenewer() throws Exception { + private void createRenewer(final DynamoDBLeaseRefresher leaseRefresher) throws Exception { when(mockExecutorService.submit(any(Callable.class))).thenAnswer(invocation -> { this.leaseRenewalCallable = (Callable) invocation.getArguments()[0]; return mockFuture; @@ -212,7 +239,7 @@ class DynamoDBLeaseRenewerTest { new NullMetricsFactory(), leaseStatsRecorder, mockLeaseGracefulShutdownCallBack); - this.leaseRefresher.createLeaseTableIfNotExists(); - this.leaseRefresher.waitUntilLeaseTableExists(1, 30); + leaseRefresher.createLeaseTableIfNotExists(); + leaseRefresher.waitUntilLeaseTableExists(1, 30); } }