Added unit tests and made getParallelScanTotalSegments synchronous

This commit is contained in:
eha sah 2025-03-07 17:41:04 -08:00
parent 57ad580bd9
commit cad4467c22
3 changed files with 161 additions and 69 deletions

View file

@ -133,9 +133,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private static final int MIN_SCAN_SEGMENTS = 1;
private static final int MAX_SCAN_SEGMENTS = 30;
private volatile Integer cachedTotalSegments;
private volatile Instant expirationTimeForTotalSegmentsCache;
private Integer cachedTotalSegments;
private Instant expirationTimeForTotalSegmentsCache;
private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2);
private final Object lock = new Object();
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
final DdbTableConfig tableConfig = new DdbTableConfig();
@ -620,23 +621,25 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
*/
private int getParallelScanTotalSegments() throws DependencyException {
if (isTotalSegmentsCacheValid()) {
log.info("Cached value used : TotalSegments for Lease table parallel scan : {}", cachedTotalSegments);
return cachedTotalSegments;
}
int parallelScanTotalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR;
synchronized(lock) {
if (isTotalSegmentsCacheValid()) {
return cachedTotalSegments;
}
DescribeTableResponse describeTableResponse = describeLeaseTable();
if (describeTableResponse == null) {
log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments);
} else {
final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB;
parallelScanTotalSegments = Math.min(
Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
}
cachedTotalSegments = parallelScanTotalSegments;
expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS);
}
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
return parallelScanTotalSegments;
}

View file

@ -44,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@ -757,39 +756,6 @@ class LeaseAssignmentManagerTest {
verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats();
}
@Test
void performAssignment_testRetryBehavior_withDynamicTotalScanSegments()
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class);
final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class);
when(mockedLeaseRefresher.listLeasesParallely(any(), eq(0))).thenThrow(new RuntimeException());
when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException());
final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
mockedLeaseRefresher,
mockedWorkerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
100L,
new NullMetricsFactory(),
scheduledExecutorService,
System::nanoTime,
Integer.MAX_VALUE,
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder()
.isGracefulLeaseHandoffEnabled(false)
.build());
leaseAssignmentManager.start();
leaseAssignmentManagerRunnable.run();
verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), eq(0));
verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats();
}
@Test
void performAssignment_invalidLeaseInTable_validateAssignmentDoesNotFail() throws Exception {
createLeaseAssignmentManager(

View file

@ -11,6 +11,8 @@ import java.util.concurrent.Executors;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@ -22,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexDescri
import software.amazon.awssdk.services.dynamodb.model.IndexStatus;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
@ -394,31 +398,150 @@ class DynamoDBLeaseRefresherTest {
}
@Test
void listLeasesParallelyWithDynamicTotalSegments_sanity()
public void listLeasesParallely_UseCachedTotalSegment()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
setupTable(leaseRefresher);
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2"));
final Map.Entry<List<Lease>, List<String>> response =
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
final long oneGB_InBytes = 1073741824L;
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(oneGB_InBytes)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
assertEquals(2, response.getKey().size());
assertEquals(0, response.getValue().size());
verify(mockDdbClient, times(5)).scan(any(ScanRequest.class));
// calling second to test cached value is used
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
// verify if describe table is called once even when listLeasesParallely is called twice
verify(mockDdbClient, times(1)).describeTable(any(DescribeTableRequest.class));
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
}
@Test
void listLeasesParallelyWithDynamicTotalSegments_leaseWithFailingDeserialization_assertCorrectResponse()
public void listLeasesParallely_DescribeTableNotCalledWhenSegmentGreaterThanZero()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
setupTable(leaseRefresher);
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
createAndPutBadLeaseEntryInTable();
final Map.Entry<List<Lease>, List<String>> response =
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(1000L)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 2);
verify(mockDdbClient, times(0)).describeTable(any(DescribeTableRequest.class));
}
@Test
public void listLeasesParallely_TotalSegmentIsDefaultWhenDescribeTableThrowsException()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenThrow(ResourceNotFoundException.builder()
.message("Mock table does not exist scenario")
.build());
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
assertEquals(1, response.getKey().size());
assertEquals("lease1", response.getKey().get(0).leaseKey());
assertEquals(1, response.getValue().size());
assertEquals("badLeaseKey", response.getValue().get(0));
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
}
@ParameterizedTest
@CsvSource({
"0, 1", // 0
"1024, 1", // 1KB
"104857600, 1", // 100MB
"214748364, 1", // 0.2GB
"322122547, 2", // 1.3GB
"1073741824, 5", // 1GB
"2147483648, 10", // 2GB
"5368709120, 25", // 5GB
})
public void listLeasesParallely_TotalSegmentForDifferentTableSize(long tableSizeBytes, int totalSegments)
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(tableSizeBytes)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
verify(mockDdbClient, times(totalSegments)).scan(any(ScanRequest.class));
}
@Test