Fix DDBLeaseManagementFactory constructor javadoc and remove deprecated annotation.
Re-add missing unit tests from merge.
This commit is contained in:
parent
a754364d29
commit
2524ef83c3
3 changed files with 163 additions and 48 deletions
|
|
@ -246,12 +246,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
* @param dynamoDbRequestTimeout
|
* @param dynamoDbRequestTimeout
|
||||||
* @param billingMode
|
* @param billingMode
|
||||||
* @param leaseTableDeletionProtectionEnabled
|
* @param leaseTableDeletionProtectionEnabled
|
||||||
|
* @param leaseTablePitrEnabled
|
||||||
* @param leaseSerializer
|
* @param leaseSerializer
|
||||||
* @param customShardDetectorProvider
|
* @param customShardDetectorProvider
|
||||||
* @param isMultiStreamMode
|
* @param isMultiStreamMode
|
||||||
* @param leaseCleanupConfig
|
* @param leaseCleanupConfig
|
||||||
|
* @param workerUtilizationAwareAssignmentConfig
|
||||||
|
* @param gracefulLeaseHandoffConfig
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
public DynamoDBLeaseManagementFactory(
|
public DynamoDBLeaseManagementFactory(
|
||||||
final KinesisAsyncClient kinesisClient,
|
final KinesisAsyncClient kinesisClient,
|
||||||
final DynamoDbAsyncClient dynamoDBClient,
|
final DynamoDbAsyncClient dynamoDBClient,
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
|
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||||
|
|
@ -25,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
|
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsResponse;
|
||||||
import software.amazon.kinesis.common.DdbTableConfig;
|
import software.amazon.kinesis.common.DdbTableConfig;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||||
|
|
@ -53,20 +54,61 @@ import static software.amazon.kinesis.leases.dynamodb.TableCreatorCallback.NOOP_
|
||||||
class DynamoDBLeaseRefresherTest {
|
class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
private static final String TEST_LEASE_TABLE = "SomeTable";
|
private static final String TEST_LEASE_TABLE = "SomeTable";
|
||||||
private DynamoDBLeaseRefresher leaseRefresher;
|
|
||||||
private final DynamoDbAsyncClient dynamoDbAsyncClient =
|
private final DynamoDbAsyncClient dynamoDbAsyncClient =
|
||||||
DynamoDBEmbedded.create().dynamoDbAsyncClient();
|
DynamoDBEmbedded.create().dynamoDbAsyncClient();
|
||||||
|
|
||||||
@BeforeEach
|
@Test
|
||||||
void setup() throws ProvisionedThroughputException, DependencyException {
|
void createLeaseTableWithPitr() throws DependencyException, ProvisionedThroughputException {
|
||||||
this.leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
// DynamoDBLocal does not support PITR operations on table so using mocks
|
||||||
|
final DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class, Mockito.RETURNS_MOCKS);
|
||||||
|
DynamoDBLeaseRefresher dynamoDBLeaseRefresherWithPitr =
|
||||||
|
createLeaseRefresher(new DdbTableConfig(), mockDdbClient, false, true);
|
||||||
|
|
||||||
this.leaseRefresher.createLeaseTableIfNotExists();
|
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
|
||||||
this.leaseRefresher.waitUntilLeaseTableExists(1, 30);
|
.thenThrow(ResourceNotFoundException.builder()
|
||||||
|
.message("Mock table does not exist scenario")
|
||||||
|
.build());
|
||||||
|
|
||||||
|
final CompletableFuture<UpdateContinuousBackupsResponse> future = new CompletableFuture<>();
|
||||||
|
future.complete(UpdateContinuousBackupsResponse.builder().build());
|
||||||
|
|
||||||
|
when(mockDdbClient.updateContinuousBackups(any(UpdateContinuousBackupsRequest.class)))
|
||||||
|
.thenReturn(future);
|
||||||
|
|
||||||
|
dynamoDBLeaseRefresherWithPitr.createLeaseTableIfNotExists();
|
||||||
|
dynamoDBLeaseRefresherWithPitr.waitUntilLeaseTableExists(1, 30);
|
||||||
|
|
||||||
|
UpdateContinuousBackupsRequest updateContinuousBackupsRequest = UpdateContinuousBackupsRequest.builder()
|
||||||
|
.tableName(TEST_LEASE_TABLE)
|
||||||
|
.pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
verify(mockDdbClient, times(1)).updateContinuousBackups(updateContinuousBackupsRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void createWorkerIdToLeaseKeyIndexIfNotExists_sanity() throws DependencyException {
|
void createLeaseTableWithDeletionProtection() throws DependencyException, ProvisionedThroughputException {
|
||||||
|
DynamoDBLeaseRefresher dynamoDBLeaseRefresherWithDeletionProtection =
|
||||||
|
createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient, true, false);
|
||||||
|
|
||||||
|
dynamoDBLeaseRefresherWithDeletionProtection.createLeaseTableIfNotExists();
|
||||||
|
dynamoDBLeaseRefresherWithDeletionProtection.waitUntilLeaseTableExists(1, 30);
|
||||||
|
|
||||||
|
final DescribeTableResponse describeTableResponse = dynamoDbAsyncClient
|
||||||
|
.describeTable(DescribeTableRequest.builder()
|
||||||
|
.tableName(TEST_LEASE_TABLE)
|
||||||
|
.build())
|
||||||
|
.join();
|
||||||
|
|
||||||
|
assertTrue(describeTableResponse.table().deletionProtectionEnabled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void createWorkerIdToLeaseKeyIndexIfNotExists_sanity() throws DependencyException, ProvisionedThroughputException {
|
||||||
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
|
leaseRefresher.waitUntilLeaseTableExists(1, 30);
|
||||||
|
|
||||||
assertFalse(leaseRefresher.isLeaseOwnerToLeaseKeyIndexActive());
|
assertFalse(leaseRefresher.isLeaseOwnerToLeaseKeyIndexActive());
|
||||||
|
|
||||||
final String creationResponse = leaseRefresher.createLeaseOwnerToLeaseKeyIndexIfNotExists();
|
final String creationResponse = leaseRefresher.createLeaseOwnerToLeaseKeyIndexIfNotExists();
|
||||||
|
|
@ -95,10 +137,13 @@ class DynamoDBLeaseRefresherTest {
|
||||||
describeTableResponse.table().globalSecondaryIndexes().get(0).indexStatus());
|
describeTableResponse.table().globalSecondaryIndexes().get(0).indexStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Add CreateLeaseTablePITR test and CreateLeaseTableDeletionProtection test
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void waitUntilLeaseOwnerToLeaseKeyIndexExists_noTransitionToActive_assertFalse() throws DependencyException {
|
void waitUntilLeaseOwnerToLeaseKeyIndexExists_noTransitionToActive_assertFalse()
|
||||||
|
throws DependencyException, ProvisionedThroughputException {
|
||||||
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
|
leaseRefresher.waitUntilLeaseTableExists(1, 30);
|
||||||
|
|
||||||
dynamoDbAsyncClient.deleteTable(
|
dynamoDbAsyncClient.deleteTable(
|
||||||
DeleteTableRequest.builder().tableName(TEST_LEASE_TABLE).build());
|
DeleteTableRequest.builder().tableName(TEST_LEASE_TABLE).build());
|
||||||
|
|
||||||
|
|
@ -108,7 +153,11 @@ class DynamoDBLeaseRefresherTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void isLeaseOwnerGsiIndexActive() throws DependencyException {
|
void isLeaseOwnerGsiIndexActive() throws DependencyException, ProvisionedThroughputException {
|
||||||
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
|
leaseRefresher.waitUntilLeaseTableExists(1, 30);
|
||||||
|
|
||||||
final DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class, Mockito.RETURNS_MOCKS);
|
final DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class, Mockito.RETURNS_MOCKS);
|
||||||
final LeaseRefresher leaseRefresherForTest = new DynamoDBLeaseRefresher(
|
final LeaseRefresher leaseRefresherForTest = new DynamoDBLeaseRefresher(
|
||||||
TEST_LEASE_TABLE,
|
TEST_LEASE_TABLE,
|
||||||
|
|
@ -210,7 +259,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
@Test
|
@Test
|
||||||
void assignLease_leaseWithPrevOwner_assertAssignmentToNewOwner()
|
void assignLease_leaseWithPrevOwner_assertAssignmentToNewOwner()
|
||||||
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
||||||
|
|
||||||
// Fetch a lease from assign it to owner2
|
// Fetch a lease from assign it to owner2
|
||||||
|
|
@ -222,7 +272,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
@Test
|
@Test
|
||||||
void assignLease_unassignedLease_assertAssignmentToNewOwner()
|
void assignLease_unassignedLease_assertAssignmentToNewOwner()
|
||||||
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", null));
|
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", null));
|
||||||
|
|
||||||
// Fetch a lease from assign it to owner2
|
// Fetch a lease from assign it to owner2
|
||||||
|
|
@ -235,7 +286,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
@Test
|
@Test
|
||||||
void assignLease_unAssignedLeaseGetsDeleted_assertAssignemntFailure()
|
void assignLease_unAssignedLeaseGetsDeleted_assertAssignemntFailure()
|
||||||
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", null));
|
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", null));
|
||||||
|
|
||||||
// Lease fetched before delete
|
// Lease fetched before delete
|
||||||
|
|
@ -255,7 +307,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
@Test
|
@Test
|
||||||
void assignLease_AssignedLeaseGetsDeleted_assertAssignemntFailure()
|
void assignLease_AssignedLeaseGetsDeleted_assertAssignemntFailure()
|
||||||
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
||||||
|
|
||||||
// Lease fetched before delete
|
// Lease fetched before delete
|
||||||
|
|
@ -279,7 +332,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
@Test
|
@Test
|
||||||
void assignLease_updatesOnTheLeaseFailsAfterAssignment()
|
void assignLease_updatesOnTheLeaseFailsAfterAssignment()
|
||||||
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final Lease originalLeaseOnWorker = createDummyLease("lease1", "leaseOwner1");
|
final Lease originalLeaseOnWorker = createDummyLease("lease1", "leaseOwner1");
|
||||||
leaseRefresher.createLeaseIfNotExists(originalLeaseOnWorker);
|
leaseRefresher.createLeaseIfNotExists(originalLeaseOnWorker);
|
||||||
|
|
||||||
|
|
@ -318,7 +372,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
@Test
|
@Test
|
||||||
void listLeasesParallely_sanity()
|
void listLeasesParallely_sanity()
|
||||||
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
||||||
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2"));
|
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2"));
|
||||||
final Map.Entry<List<Lease>, List<String>> response =
|
final Map.Entry<List<Lease>, List<String>> response =
|
||||||
|
|
@ -330,7 +385,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
@Test
|
@Test
|
||||||
void listLeasesParallely_leaseWithFailingDeserialization_assertCorrectResponse()
|
void listLeasesParallely_leaseWithFailingDeserialization_assertCorrectResponse()
|
||||||
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
|
||||||
createAndPutBadLeaseEntryInTable();
|
createAndPutBadLeaseEntryInTable();
|
||||||
final Map.Entry<List<Lease>, List<String>> response =
|
final Map.Entry<List<Lease>, List<String>> response =
|
||||||
|
|
@ -343,7 +399,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void initiateGracefulLeaseHandoff_sanity() throws Exception {
|
void initiateGracefulLeaseHandoff_sanity() throws Exception {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final String nextOwner = "nextOwner";
|
final String nextOwner = "nextOwner";
|
||||||
final String currentOwner = "currentOwner";
|
final String currentOwner = "currentOwner";
|
||||||
final Lease lease = createDummyLease("lease1", currentOwner);
|
final Lease lease = createDummyLease("lease1", currentOwner);
|
||||||
|
|
@ -357,7 +414,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void initiateGracefulLeaseHandoff_conditionalFailure() throws Exception {
|
void initiateGracefulLeaseHandoff_conditionalFailure() throws Exception {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final String nextOwner = "nextOwner";
|
final String nextOwner = "nextOwner";
|
||||||
final String currentOwner = "currentOwner";
|
final String currentOwner = "currentOwner";
|
||||||
final Lease lease = createDummyLease("lease1", currentOwner);
|
final Lease lease = createDummyLease("lease1", currentOwner);
|
||||||
|
|
@ -369,7 +427,9 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void renewLease_testGracefulShutdown_updateLeaseWhenDetectedShutdown() throws Exception {
|
void renewLease_testGracefulShutdown_updateLeaseWhenDetectedShutdown() throws Exception {
|
||||||
leaseRefresher = spy(leaseRefresher);
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
DynamoDBLeaseRefresher leaseRefresherSpy = spy(leaseRefresher);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final String nextOwner = "nextOwner";
|
final String nextOwner = "nextOwner";
|
||||||
final String currentOwner = "currentOwner";
|
final String currentOwner = "currentOwner";
|
||||||
final Lease lease = createDummyLease("lease1", currentOwner);
|
final Lease lease = createDummyLease("lease1", currentOwner);
|
||||||
|
|
@ -379,24 +439,28 @@ class DynamoDBLeaseRefresherTest {
|
||||||
lease.checkpointOwner(null);
|
lease.checkpointOwner(null);
|
||||||
lease.leaseOwner(currentOwner);
|
lease.leaseOwner(currentOwner);
|
||||||
// renew should see that the lease has the shutdown attributes and so mark them on the passed-in lease.
|
// renew should see that the lease has the shutdown attributes and so mark them on the passed-in lease.
|
||||||
assertTrue(leaseRefresher.renewLease(lease));
|
assertTrue(leaseRefresherSpy.renewLease(lease));
|
||||||
assertEquals(currentOwner, lease.checkpointOwner());
|
assertEquals(currentOwner, lease.checkpointOwner());
|
||||||
assertEquals(nextOwner, lease.leaseOwner());
|
assertEquals(nextOwner, lease.leaseOwner());
|
||||||
assertEquals(lease, leaseRefresher.getLease(lease.leaseKey()));
|
assertEquals(lease, leaseRefresher.getLease(lease.leaseKey()));
|
||||||
verify(leaseRefresher, times(2)).renewLease(lease);
|
verify(leaseRefresherSpy, times(2)).renewLease(lease);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void renewLease_testGracefulShutdown_conditionalFailureDueToNoLeaseInDdb_NotTryingToRenew() throws Exception {
|
void renewLease_testGracefulShutdown_conditionalFailureDueToNoLeaseInDdb_NotTryingToRenew() throws Exception {
|
||||||
leaseRefresher = spy(leaseRefresher);
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
DynamoDBLeaseRefresher leaseRefresherSpy = spy(leaseRefresher);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final Lease lease = createDummyLease("lease1", "currentOwner");
|
final Lease lease = createDummyLease("lease1", "currentOwner");
|
||||||
assertFalse(leaseRefresher.renewLease(lease));
|
assertFalse(leaseRefresherSpy.renewLease(lease));
|
||||||
verify(leaseRefresher, times(1)).renewLease(lease);
|
verify(leaseRefresherSpy, times(1)).renewLease(lease);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void renewLease_testGracefulShutdown_remoteLeaseHasDifferentOwner_NotTryingToRenew() throws Exception {
|
void renewLease_testGracefulShutdown_remoteLeaseHasDifferentOwner_NotTryingToRenew() throws Exception {
|
||||||
leaseRefresher = spy(leaseRefresher);
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
|
DynamoDBLeaseRefresher leaseRefresherSpy = spy(leaseRefresher);
|
||||||
final Lease lease = createDummyLease("lease1", "currentOwner");
|
final Lease lease = createDummyLease("lease1", "currentOwner");
|
||||||
final Lease originalLease = lease.copy();
|
final Lease originalLease = lease.copy();
|
||||||
leaseRefresher.createLeaseIfNotExists(lease);
|
leaseRefresher.createLeaseIfNotExists(lease);
|
||||||
|
|
@ -405,12 +469,14 @@ class DynamoDBLeaseRefresherTest {
|
||||||
leaseRefresher.assignLease(lease, "nextOwner");
|
leaseRefresher.assignLease(lease, "nextOwner");
|
||||||
leaseRefresher.initiateGracefulLeaseHandoff(lease, "nextOwner2");
|
leaseRefresher.initiateGracefulLeaseHandoff(lease, "nextOwner2");
|
||||||
|
|
||||||
assertFalse(leaseRefresher.renewLease(originalLease));
|
assertFalse(leaseRefresherSpy.renewLease(originalLease));
|
||||||
verify(leaseRefresher, times(1)).renewLease(originalLease);
|
verify(leaseRefresherSpy, times(1)).renewLease(originalLease);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void renewLease_testGracefulShutdown_continueUpdateLeaseUntilLeaseIsTransferred() throws Exception {
|
void renewLease_testGracefulShutdown_continueUpdateLeaseUntilLeaseIsTransferred() throws Exception {
|
||||||
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final String nextOwner = "nextOwner";
|
final String nextOwner = "nextOwner";
|
||||||
final String currentOwner = "currentOwner";
|
final String currentOwner = "currentOwner";
|
||||||
final Lease lease = createDummyLease("lease1", currentOwner);
|
final Lease lease = createDummyLease("lease1", currentOwner);
|
||||||
|
|
@ -435,7 +501,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void assignLease_alwaysRemoveCheckpointOwner() throws Exception {
|
void assignLease_alwaysRemoveCheckpointOwner() throws Exception {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final String nextOwner = "nextOwner";
|
final String nextOwner = "nextOwner";
|
||||||
final String currentOwner = "currentOwner";
|
final String currentOwner = "currentOwner";
|
||||||
final Lease lease = createDummyLease("lease1", currentOwner);
|
final Lease lease = createDummyLease("lease1", currentOwner);
|
||||||
|
|
@ -451,7 +518,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void assignLease_conditionalFailureBecauseCheckpointOwnerIsNotExpected() throws Exception {
|
void assignLease_conditionalFailureBecauseCheckpointOwnerIsNotExpected() throws Exception {
|
||||||
setupTable();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final String nextOwner = "nextOwner";
|
final String nextOwner = "nextOwner";
|
||||||
final String currentOwner = "currentOwner";
|
final String currentOwner = "currentOwner";
|
||||||
|
|
||||||
|
|
@ -535,7 +603,7 @@ class DynamoDBLeaseRefresherTest {
|
||||||
final LeaseRefresher leaseRefresher = createLeaseRefresher(createProvisionedTableConfig(), dbAsyncClient);
|
final LeaseRefresher leaseRefresher = createLeaseRefresher(createProvisionedTableConfig(), dbAsyncClient);
|
||||||
|
|
||||||
// Creates base table and GSI
|
// Creates base table and GSI
|
||||||
setupTable(leaseRefresher);
|
setupTableWithLeaseKeyIndex(leaseRefresher);
|
||||||
|
|
||||||
final DescribeTableResponse describeTableResponse = dbAsyncClient
|
final DescribeTableResponse describeTableResponse = dbAsyncClient
|
||||||
.describeTable(DescribeTableRequest.builder()
|
.describeTable(DescribeTableRequest.builder()
|
||||||
|
|
@ -571,7 +639,7 @@ class DynamoDBLeaseRefresherTest {
|
||||||
final LeaseRefresher leaseRefresher = createLeaseRefresher(createOnDemandTableConfig(), dbAsyncClient);
|
final LeaseRefresher leaseRefresher = createLeaseRefresher(createOnDemandTableConfig(), dbAsyncClient);
|
||||||
|
|
||||||
// Creates base table and GSI
|
// Creates base table and GSI
|
||||||
setupTable(leaseRefresher);
|
setupTableWithLeaseKeyIndex(leaseRefresher);
|
||||||
|
|
||||||
final DescribeTableResponse describeTableResponse = dbAsyncClient
|
final DescribeTableResponse describeTableResponse = dbAsyncClient
|
||||||
.describeTable(DescribeTableRequest.builder()
|
.describeTable(DescribeTableRequest.builder()
|
||||||
|
|
@ -602,7 +670,9 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void takeLease_removesCheckpointOwner() throws Exception {
|
public void takeLease_removesCheckpointOwner() throws Exception {
|
||||||
final Lease lease = createPendingCheckpointOwnerLease();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
|
final Lease lease = createPendingCheckpointOwnerLease(leaseRefresher);
|
||||||
assertTrue(leaseRefresher.takeLease(lease, "newOwner"));
|
assertTrue(leaseRefresher.takeLease(lease, "newOwner"));
|
||||||
|
|
||||||
final Lease updatedLease = leaseRefresher.getLease(lease.leaseKey());
|
final Lease updatedLease = leaseRefresher.getLease(lease.leaseKey());
|
||||||
|
|
@ -612,7 +682,9 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void evictLease_removesCheckpointOwner() throws Exception {
|
public void evictLease_removesCheckpointOwner() throws Exception {
|
||||||
final Lease lease = createPendingCheckpointOwnerLease();
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
|
final Lease lease = createPendingCheckpointOwnerLease(leaseRefresher);
|
||||||
final long originalCounter = lease.leaseCounter();
|
final long originalCounter = lease.leaseCounter();
|
||||||
assertTrue(leaseRefresher.evictLease(lease));
|
assertTrue(leaseRefresher.evictLease(lease));
|
||||||
|
|
||||||
|
|
@ -625,6 +697,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void evictLease_removesOwnerIfCheckpointOwnerIsNull() throws Exception {
|
public void evictLease_removesOwnerIfCheckpointOwnerIsNull() throws Exception {
|
||||||
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final Lease lease = createDummyLease("1", "ownerA");
|
final Lease lease = createDummyLease("1", "ownerA");
|
||||||
final long originalCounter = lease.leaseCounter();
|
final long originalCounter = lease.leaseCounter();
|
||||||
leaseRefresher.createLeaseIfNotExists(lease);
|
leaseRefresher.createLeaseIfNotExists(lease);
|
||||||
|
|
@ -639,6 +713,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void evictLease_noOpIfLeaseNotExists() throws Exception {
|
public void evictLease_noOpIfLeaseNotExists() throws Exception {
|
||||||
|
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
|
||||||
|
setupTable(leaseRefresher);
|
||||||
final Lease lease = createDummyLease("1", "ownerA");
|
final Lease lease = createDummyLease("1", "ownerA");
|
||||||
assertFalse(leaseRefresher.evictLease(lease));
|
assertFalse(leaseRefresher.evictLease(lease));
|
||||||
|
|
||||||
|
|
@ -648,7 +724,7 @@ class DynamoDBLeaseRefresherTest {
|
||||||
assertFalse(leaseRefresher.evictLease(lease));
|
assertFalse(leaseRefresher.evictLease(lease));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Lease createPendingCheckpointOwnerLease() throws Exception {
|
private Lease createPendingCheckpointOwnerLease(final LeaseRefresher leaseRefresher) throws Exception {
|
||||||
final Lease lease = createDummyLease("1", "ownerA");
|
final Lease lease = createDummyLease("1", "ownerA");
|
||||||
lease.checkpointOwner("checkpointOwner");
|
lease.checkpointOwner("checkpointOwner");
|
||||||
leaseRefresher.createLeaseIfNotExists(lease);
|
leaseRefresher.createLeaseIfNotExists(lease);
|
||||||
|
|
@ -701,6 +777,14 @@ class DynamoDBLeaseRefresherTest {
|
||||||
|
|
||||||
private DynamoDBLeaseRefresher createLeaseRefresher(
|
private DynamoDBLeaseRefresher createLeaseRefresher(
|
||||||
final DdbTableConfig ddbTableConfig, final DynamoDbAsyncClient dynamoDbAsyncClient) {
|
final DdbTableConfig ddbTableConfig, final DynamoDbAsyncClient dynamoDbAsyncClient) {
|
||||||
|
return createLeaseRefresher(ddbTableConfig, dynamoDbAsyncClient, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DynamoDBLeaseRefresher createLeaseRefresher(
|
||||||
|
final DdbTableConfig ddbTableConfig,
|
||||||
|
final DynamoDbAsyncClient dynamoDbAsyncClient,
|
||||||
|
boolean deletionProtectionEnabled,
|
||||||
|
boolean pitrEnabled) {
|
||||||
return new DynamoDBLeaseRefresher(
|
return new DynamoDBLeaseRefresher(
|
||||||
TEST_LEASE_TABLE,
|
TEST_LEASE_TABLE,
|
||||||
dynamoDbAsyncClient,
|
dynamoDbAsyncClient,
|
||||||
|
|
@ -709,8 +793,8 @@ class DynamoDBLeaseRefresherTest {
|
||||||
NOOP_TABLE_CREATOR_CALLBACK,
|
NOOP_TABLE_CREATOR_CALLBACK,
|
||||||
Duration.ofSeconds(10),
|
Duration.ofSeconds(10),
|
||||||
ddbTableConfig,
|
ddbTableConfig,
|
||||||
true,
|
deletionProtectionEnabled,
|
||||||
false,
|
pitrEnabled,
|
||||||
new ArrayList<>());
|
new ArrayList<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -722,11 +806,13 @@ class DynamoDBLeaseRefresherTest {
|
||||||
return lease;
|
return lease;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupTable() throws ProvisionedThroughputException, DependencyException {
|
private void setupTable(final LeaseRefresher refresher) throws ProvisionedThroughputException, DependencyException {
|
||||||
setupTable(leaseRefresher);
|
refresher.createLeaseTableIfNotExists();
|
||||||
|
refresher.waitUntilLeaseTableExists(1, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupTable(final LeaseRefresher refresher) throws ProvisionedThroughputException, DependencyException {
|
private void setupTableWithLeaseKeyIndex(final LeaseRefresher refresher)
|
||||||
|
throws ProvisionedThroughputException, DependencyException {
|
||||||
refresher.createLeaseTableIfNotExists();
|
refresher.createLeaseTableIfNotExists();
|
||||||
refresher.waitUntilLeaseTableExists(1, 100);
|
refresher.waitUntilLeaseTableExists(1, 100);
|
||||||
refresher.createLeaseOwnerToLeaseKeyIndexIfNotExists();
|
refresher.createLeaseOwnerToLeaseKeyIndexIfNotExists();
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
@ -34,12 +35,15 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
import static software.amazon.kinesis.leases.dynamodb.TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
|
import static software.amazon.kinesis.leases.dynamodb.TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
|
||||||
|
import static software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber.LATEST;
|
||||||
import static software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber.TRIM_HORIZON;
|
import static software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber.TRIM_HORIZON;
|
||||||
|
|
||||||
class DynamoDBLeaseRenewerTest {
|
class DynamoDBLeaseRenewerTest {
|
||||||
|
|
@ -125,7 +129,7 @@ class DynamoDBLeaseRenewerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void renewLeases_enqueueShutdownRequestedLease_sanity() throws Exception {
|
void renewLeases_enqueueShutdownRequestedLease_sanity() throws Exception {
|
||||||
createRenewer();
|
createRenewer(leaseRefresher);
|
||||||
final Lease lease = createDummyLease("key-1", WORKER_ID);
|
final Lease lease = createDummyLease("key-1", WORKER_ID);
|
||||||
leaseRefresher.createLeaseIfNotExists(lease);
|
leaseRefresher.createLeaseIfNotExists(lease);
|
||||||
leaseRenewer.addLeasesToRenew(ImmutableList.of(lease));
|
leaseRenewer.addLeasesToRenew(ImmutableList.of(lease));
|
||||||
|
|
@ -186,7 +190,30 @@ class DynamoDBLeaseRenewerTest {
|
||||||
assertTrue(leaseKeyToLeaseMap.containsKey("leaseKey4"));
|
assertTrue(leaseKeyToLeaseMap.containsKey("leaseKey4"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate
|
@Test
|
||||||
|
void testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate() throws Exception {
|
||||||
|
DynamoDBLeaseRefresher leaseRefresherMock = mock(DynamoDBLeaseRefresher.class, Mockito.RETURNS_MOCKS);
|
||||||
|
createRenewer(leaseRefresherMock);
|
||||||
|
|
||||||
|
final String leaseKey = "leaseToUpdate";
|
||||||
|
final Lease lease = createDummyLease(leaseKey, WORKER_ID);
|
||||||
|
leaseRenewer.addLeasesToRenew(ImmutableList.of(lease));
|
||||||
|
final Lease updatedLease = createDummyLease(leaseKey, WORKER_ID);
|
||||||
|
updatedLease.checkpoint(LATEST);
|
||||||
|
|
||||||
|
when(leaseRefresherMock.updateLease(updatedLease)).thenThrow(new DependencyException(new RuntimeException()));
|
||||||
|
try {
|
||||||
|
final UUID concurrencyToken =
|
||||||
|
leaseRenewer.getCurrentlyHeldLease(leaseKey).concurrencyToken();
|
||||||
|
leaseRenewer.updateLease(updatedLease, concurrencyToken, "test", "dummyShardId");
|
||||||
|
fail();
|
||||||
|
} catch (DependencyException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
final Lease currentLease = leaseRenewer.getCurrentlyHeldLeases().get(leaseKey);
|
||||||
|
assertEquals(123L, currentLease.leaseCounter()); // leaseCounter should not be incremented due to DDB failure
|
||||||
|
assertEquals(TRIM_HORIZON, currentLease.checkpoint());
|
||||||
|
}
|
||||||
|
|
||||||
private void createAndPutBadLeaseEntryInTable() {
|
private void createAndPutBadLeaseEntryInTable() {
|
||||||
final PutItemRequest putItemRequest = PutItemRequest.builder()
|
final PutItemRequest putItemRequest = PutItemRequest.builder()
|
||||||
|
|
@ -198,7 +225,7 @@ class DynamoDBLeaseRenewerTest {
|
||||||
dynamoDbAsyncClient.putItem(putItemRequest);
|
dynamoDbAsyncClient.putItem(putItemRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createRenewer() throws Exception {
|
private void createRenewer(final DynamoDBLeaseRefresher leaseRefresher) throws Exception {
|
||||||
when(mockExecutorService.submit(any(Callable.class))).thenAnswer(invocation -> {
|
when(mockExecutorService.submit(any(Callable.class))).thenAnswer(invocation -> {
|
||||||
this.leaseRenewalCallable = (Callable) invocation.getArguments()[0];
|
this.leaseRenewalCallable = (Callable) invocation.getArguments()[0];
|
||||||
return mockFuture;
|
return mockFuture;
|
||||||
|
|
@ -212,7 +239,7 @@ class DynamoDBLeaseRenewerTest {
|
||||||
new NullMetricsFactory(),
|
new NullMetricsFactory(),
|
||||||
leaseStatsRecorder,
|
leaseStatsRecorder,
|
||||||
mockLeaseGracefulShutdownCallBack);
|
mockLeaseGracefulShutdownCallBack);
|
||||||
this.leaseRefresher.createLeaseTableIfNotExists();
|
leaseRefresher.createLeaseTableIfNotExists();
|
||||||
this.leaseRefresher.waitUntilLeaseTableExists(1, 30);
|
leaseRefresher.waitUntilLeaseTableExists(1, 30);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue