From 5499da347a5cb3fce4f5a623651c1c803b002f98 Mon Sep 17 00:00:00 2001 From: eha sah Date: Fri, 7 Mar 2025 12:14:12 -0800 Subject: [PATCH] Reuse listLeasesParallely api to dynamically calculate total segment --- .../assignment/LeaseAssignmentManager.java | 4 +- .../amazon/kinesis/leases/LeaseRefresher.java | 22 ++-------- .../dynamodb/DynamoDBLeaseRefresher.java | 19 ++++----- .../LeaseAssignmentManagerTest.java | 40 +++++++++++++++++-- .../dynamodb/DynamoDBLeaseRefresherTest.java | 4 +- 5 files changed, 54 insertions(+), 35 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index 4c78043b..f5ea5470 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -684,8 +684,8 @@ public final class LeaseAssignmentManager { } private CompletableFuture, List>> loadLeaseListAsync() { - return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> - leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(LEASE_ASSIGNMENT_CALL_THREAD_POOL))); + return CompletableFuture.supplyAsync(() -> + loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0))); } private T loadWithRetry(final Callable loadFunction) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 08ebf0cb..49bec215 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -150,29 +150,13 @@ public interface LeaseRefresher { */ List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException; - /** - * List all leases from the storage parallely by dynamically calculating total segments and - * deserialize into Lease objects. Returns the list of leaseKey - * that failed deserialize separately. - * - * @param threadPool thread pool to use for parallel scan - * @return Pair of List of leases from the storage and List of items failed to deserialize - * @throws DependencyException if DynamoDB scan fails in an unexpected way - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity - */ - default Map.Entry, List> listLeasesParallelyWithDynamicTotalSegments( - final ExecutorService threadPool) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - throw new UnsupportedOperationException("listLeasesParallelyWithDynamicTotalSegments is not implemented"); - } - /** * List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey * that failed deserialize separately. * - * @param threadPool threadpool to use for parallel scan - * @param parallelismFactor no. of parallel scans + * @param threadPool thread pool to use for parallel scan + * @param parallelismFactor no. of parallel scans. + * If parallelismFactor is 0 then parallelismFactor will be calculated based on table size * @return Pair of List of leases from the storage and List of items failed to deserialize * @throws DependencyException if DynamoDB scan fails in an unexpected way * @throws InvalidStateException if lease table does not exist diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 6db19b7f..c651af91 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -561,13 +561,6 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return list(null, null); } - @Override - public Map.Entry, List> listLeasesParallelyWithDynamicTotalSegments( - final ExecutorService parallelScanExecutorService) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return listLeasesParallely(parallelScanExecutorService, getParallelScanTotalSegments()); - } - @Override public Map.Entry, List> listLeasesParallely( final ExecutorService parallelScanExecutorService, final int parallelScanTotalSegment) @@ -575,9 +568,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final List leaseItemFailedDeserialize = new ArrayList<>(); final List response = new ArrayList<>(); final List>>> futures = new ArrayList<>(); - for (int i = 0; i < parallelScanTotalSegment; ++i) { + + final int totalSegments; + if (parallelScanTotalSegment > 0) { + totalSegments = parallelScanTotalSegment; + } else { + totalSegments = getParallelScanTotalSegments(); + } + + for (int i = 0; i < totalSegments; ++i) { final int segmentNumber = i; - futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment))); + futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments))); } try { for (final Future>> future : futures) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 1c48ed93..21c54e75 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -44,8 +44,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.times; @@ -729,8 +731,7 @@ class LeaseAssignmentManagerTest { final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class); final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class); - when(mockedLeaseRefresher.listLeasesParallelyWithDynamicTotalSegments(any())) - .thenThrow(new RuntimeException()); + when(mockedLeaseRefresher.listLeasesParallely(any(), anyInt())).thenThrow(new RuntimeException()); when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException()); final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( @@ -752,7 +753,40 @@ class LeaseAssignmentManagerTest { leaseAssignmentManagerRunnable.run(); - verify(mockedLeaseRefresher, times(2)).listLeasesParallelyWithDynamicTotalSegments(any()); + verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), anyInt()); + verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats(); + } + + @Test + void performAssignment_testRetryBehavior_withDynamicTotalScanSegments() + throws ProvisionedThroughputException, InvalidStateException, DependencyException { + + final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class); + final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class); + + when(mockedLeaseRefresher.listLeasesParallely(any(), eq(0))).thenThrow(new RuntimeException()); + when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException()); + + final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( + mockedLeaseRefresher, + mockedWorkerMetricsDAO, + mockLeaderDecider, + getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20), + TEST_LEADER_WORKER_ID, + 100L, + new NullMetricsFactory(), + scheduledExecutorService, + System::nanoTime, + Integer.MAX_VALUE, + LeaseManagementConfig.GracefulLeaseHandoffConfig.builder() + .isGracefulLeaseHandoffEnabled(false) + .build()); + + leaseAssignmentManager.start(); + + leaseAssignmentManagerRunnable.run(); + + verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), eq(0)); verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 20c48ad4..218d9e54 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -401,7 +401,7 @@ class DynamoDBLeaseRefresherTest { leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2")); final Map.Entry, List> response = - leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2)); + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); assertEquals(2, response.getKey().size()); assertEquals(0, response.getValue().size()); } @@ -414,7 +414,7 @@ class DynamoDBLeaseRefresherTest { leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); createAndPutBadLeaseEntryInTable(); final Map.Entry, List> response = - leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2)); + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); assertEquals(1, response.getKey().size()); assertEquals("lease1", response.getKey().get(0).leaseKey()); assertEquals(1, response.getValue().size());