Fixes DDB usage spike issue

This commit is contained in:
eha sah 2025-03-03 10:03:52 -08:00
parent c9563ab585
commit 4d7392f04e
5 changed files with 107 additions and 10 deletions

View file

@ -86,11 +86,6 @@ public final class LeaseAssignmentManager {
*/
private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
/**
* Default parallelism factor for scaling lease table.
*/
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
/**
@ -689,8 +684,8 @@ public final class LeaseAssignmentManager {
}
private CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync() {
return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely(
LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR)));
return CompletableFuture.supplyAsync(() -> loadWithRetry(() ->
leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(LEASE_ASSIGNMENT_CALL_THREAD_POOL)));
}
private <T> T loadWithRetry(final Callable<T> loadFunction) {

View file

@ -150,6 +150,23 @@ public interface LeaseRefresher {
*/
List<Lease> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* List all leases from the storage parallely by dynamically calculating total segments and
* deserialize into Lease objects. Returns the list of leaseKey
* that failed deserialize separately.
*
* @param threadPool thread pool to use for parallel scan
* @return Pair of List of leases from the storage and List of items failed to deserialize
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*/
default Map.Entry<List<Lease>, List<String>> listLeasesParallelyWithDynamicTotalSegments(
final ExecutorService threadPool)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throw new UnsupportedOperationException("listLeasesParallelyWithDynamicTotalSegments is not implemented");
}
/**
* List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
* that failed deserialize separately.

View file

@ -122,6 +122,16 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION =
String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER);
/**
* Default parallelism factor for scaling lease table.
*/
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024;
private static final double GB_PER_SEGMENT = 0.2;
private static final int MIN_SCAN_SEGMENTS = 1;
private static final int MAX_SCAN_SEGMENTS = 1000000;
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
final DdbTableConfig tableConfig = new DdbTableConfig();
tableConfig.billingMode(billingMode);
@ -546,6 +556,24 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return list(null, null);
}
@Override
public Map.Entry<List<Lease>, List<String>> listLeasesParallelyWithDynamicTotalSegments(
final ExecutorService parallelScanExecutorService)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
int parallelScanTotalSegment = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR;
DescribeTableResponse describeTableResponse = describeLeaseTable();
if (describeTableResponse != null) {
parallelScanTotalSegment =
getParallelScanTotalSegments(describeTableResponse.table().tableSizeBytes());
} else {
log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegment);
}
return listLeasesParallely(parallelScanExecutorService, parallelScanTotalSegment);
}
@Override
public Map.Entry<List<Lease>, List<String>> listLeasesParallely(
final ExecutorService parallelScanExecutorService, final int parallelScanTotalSegment)
@ -586,6 +614,35 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize);
}
/**
* Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size.
* The calculation follows these rules:
* - Each segment handles 0.2GB (214,748,364 bytes) of data
* - For empty tables or tables smaller than 0.2GB, uses 1 segment
* - Number of segments scales linearly with table size
*
* @return The number of segments to use for parallel scan, minimum 1
*/
private int getParallelScanTotalSegments(double tableSizeBytes) {
int totalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR;
try {
double tableSizeGB = tableSizeBytes / NUMBER_OF_BYTES_PER_GB;
totalSegments = (int) Math.ceil(tableSizeGB / GB_PER_SEGMENT);
} catch (Exception e) {
log.info(
"Error while getting totalSegments so using default totalSegments : {}. Exception : {}",
DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR,
e.getMessage());
return totalSegments;
}
totalSegments = Math.min(Math.max(totalSegments, MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
log.info("TotalSegments for Lease table parallel scan : {}", totalSegments);
return totalSegments;
}
private List<Map<String, AttributeValue>> scanSegment(final int segment, final int parallelScanTotalSegment)
throws DependencyException {

View file

@ -46,7 +46,6 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
@ -730,7 +729,8 @@ class LeaseAssignmentManagerTest {
final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class);
final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class);
when(mockedLeaseRefresher.listLeasesParallely(any(), anyInt())).thenThrow(new RuntimeException());
when(mockedLeaseRefresher.listLeasesParallelyWithDynamicTotalSegments(any()))
.thenThrow(new RuntimeException());
when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException());
final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
@ -752,7 +752,7 @@ class LeaseAssignmentManagerTest {
leaseAssignmentManagerRunnable.run();
verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), anyInt());
verify(mockedLeaseRefresher, times(2)).listLeasesParallelyWithDynamicTotalSegments(any());
verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats();
}

View file

@ -393,6 +393,34 @@ class DynamoDBLeaseRefresherTest {
assertEquals("badLeaseKey", response.getValue().get(0));
}
@Test
void listLeasesParallelyWithDynamicTotalSegments_sanity()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
setupTable(leaseRefresher);
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2"));
final Map.Entry<List<Lease>, List<String>> response =
leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2));
assertEquals(2, response.getKey().size());
assertEquals(0, response.getValue().size());
}
@Test
void listLeasesParallelyWithDynamicTotalSegments_leaseWithFailingDeserialization_assertCorrectResponse()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);
setupTable(leaseRefresher);
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
createAndPutBadLeaseEntryInTable();
final Map.Entry<List<Lease>, List<String>> response =
leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2));
assertEquals(1, response.getKey().size());
assertEquals("lease1", response.getKey().get(0).leaseKey());
assertEquals(1, response.getValue().size());
assertEquals("badLeaseKey", response.getValue().get(0));
}
@Test
void initiateGracefulLeaseHandoff_sanity() throws Exception {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);