Reuse listLeasesParallely api to dynamically calculate total segment

This commit is contained in:
eha sah 2025-03-07 12:14:12 -08:00
parent 8e0a5e5b13
commit 5499da347a
5 changed files with 54 additions and 35 deletions

View file

@ -684,8 +684,8 @@ public final class LeaseAssignmentManager {
} }
private CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync() { private CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync() {
return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> return CompletableFuture.supplyAsync(() ->
leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(LEASE_ASSIGNMENT_CALL_THREAD_POOL))); loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0)));
} }
private <T> T loadWithRetry(final Callable<T> loadFunction) { private <T> T loadWithRetry(final Callable<T> loadFunction) {

View file

@ -150,29 +150,13 @@ public interface LeaseRefresher {
*/ */
List<Lease> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException; List<Lease> listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* List all leases from the storage parallely by dynamically calculating total segments and
* deserialize into Lease objects. Returns the list of leaseKey
* that failed deserialize separately.
*
* @param threadPool thread pool to use for parallel scan
* @return Pair of List of leases from the storage and List of items failed to deserialize
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*/
default Map.Entry<List<Lease>, List<String>> listLeasesParallelyWithDynamicTotalSegments(
final ExecutorService threadPool)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throw new UnsupportedOperationException("listLeasesParallelyWithDynamicTotalSegments is not implemented");
}
/** /**
* List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey * List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
* that failed deserialize separately. * that failed deserialize separately.
* *
* @param threadPool threadpool to use for parallel scan * @param threadPool thread pool to use for parallel scan
* @param parallelismFactor no. of parallel scans * @param parallelismFactor no. of parallel scans.
* If parallelismFactor is 0 then parallelismFactor will be calculated based on table size
* @return Pair of List of leases from the storage and List of items failed to deserialize * @return Pair of List of leases from the storage and List of items failed to deserialize
* @throws DependencyException if DynamoDB scan fails in an unexpected way * @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist * @throws InvalidStateException if lease table does not exist

View file

@ -561,13 +561,6 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return list(null, null); return list(null, null);
} }
@Override
public Map.Entry<List<Lease>, List<String>> listLeasesParallelyWithDynamicTotalSegments(
final ExecutorService parallelScanExecutorService)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return listLeasesParallely(parallelScanExecutorService, getParallelScanTotalSegments());
}
@Override @Override
public Map.Entry<List<Lease>, List<String>> listLeasesParallely( public Map.Entry<List<Lease>, List<String>> listLeasesParallely(
final ExecutorService parallelScanExecutorService, final int parallelScanTotalSegment) final ExecutorService parallelScanExecutorService, final int parallelScanTotalSegment)
@ -575,9 +568,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final List<String> leaseItemFailedDeserialize = new ArrayList<>(); final List<String> leaseItemFailedDeserialize = new ArrayList<>();
final List<Lease> response = new ArrayList<>(); final List<Lease> response = new ArrayList<>();
final List<Future<List<Map<String, AttributeValue>>>> futures = new ArrayList<>(); final List<Future<List<Map<String, AttributeValue>>>> futures = new ArrayList<>();
for (int i = 0; i < parallelScanTotalSegment; ++i) {
final int totalSegments;
if (parallelScanTotalSegment > 0) {
totalSegments = parallelScanTotalSegment;
} else {
totalSegments = getParallelScanTotalSegments();
}
for (int i = 0; i < totalSegments; ++i) {
final int segmentNumber = i; final int segmentNumber = i;
futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment))); futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments)));
} }
try { try {
for (final Future<List<Map<String, AttributeValue>>> future : futures) { for (final Future<List<Map<String, AttributeValue>>> future : futures) {

View file

@ -44,8 +44,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -729,8 +731,7 @@ class LeaseAssignmentManagerTest {
final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class); final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class);
final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class); final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class);
when(mockedLeaseRefresher.listLeasesParallelyWithDynamicTotalSegments(any())) when(mockedLeaseRefresher.listLeasesParallely(any(), anyInt())).thenThrow(new RuntimeException());
.thenThrow(new RuntimeException());
when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException()); when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException());
final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
@ -752,7 +753,40 @@ class LeaseAssignmentManagerTest {
leaseAssignmentManagerRunnable.run(); leaseAssignmentManagerRunnable.run();
verify(mockedLeaseRefresher, times(2)).listLeasesParallelyWithDynamicTotalSegments(any()); verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), anyInt());
verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats();
}
@Test
void performAssignment_testRetryBehavior_withDynamicTotalScanSegments()
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class);
final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class);
when(mockedLeaseRefresher.listLeasesParallely(any(), eq(0))).thenThrow(new RuntimeException());
when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException());
final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
mockedLeaseRefresher,
mockedWorkerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
100L,
new NullMetricsFactory(),
scheduledExecutorService,
System::nanoTime,
Integer.MAX_VALUE,
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder()
.isGracefulLeaseHandoffEnabled(false)
.build());
leaseAssignmentManager.start();
leaseAssignmentManagerRunnable.run();
verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), eq(0));
verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats(); verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats();
} }

View file

@ -401,7 +401,7 @@ class DynamoDBLeaseRefresherTest {
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2")); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2"));
final Map.Entry<List<Lease>, List<String>> response = final Map.Entry<List<Lease>, List<String>> response =
leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2)); leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
assertEquals(2, response.getKey().size()); assertEquals(2, response.getKey().size());
assertEquals(0, response.getValue().size()); assertEquals(0, response.getValue().size());
} }
@ -414,7 +414,7 @@ class DynamoDBLeaseRefresherTest {
leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1"));
createAndPutBadLeaseEntryInTable(); createAndPutBadLeaseEntryInTable();
final Map.Entry<List<Lease>, List<String>> response = final Map.Entry<List<Lease>, List<String>> response =
leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2)); leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
assertEquals(1, response.getKey().size()); assertEquals(1, response.getKey().size());
assertEquals("lease1", response.getKey().get(0).leaseKey()); assertEquals("lease1", response.getKey().get(0).leaseKey());
assertEquals(1, response.getValue().size()); assertEquals(1, response.getValue().size());