From 4d7392f04e235b91131631bdf25f4fc1c5415bcf Mon Sep 17 00:00:00 2001 From: eha sah Date: Mon, 3 Mar 2025 10:03:52 -0800 Subject: [PATCH] Fixes DDB usage spike issue --- .../assignment/LeaseAssignmentManager.java | 9 +-- .../amazon/kinesis/leases/LeaseRefresher.java | 17 ++++++ .../dynamodb/DynamoDBLeaseRefresher.java | 57 +++++++++++++++++++ .../LeaseAssignmentManagerTest.java | 6 +- .../dynamodb/DynamoDBLeaseRefresherTest.java | 28 +++++++++ 5 files changed, 107 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index edd6c2a9..4c78043b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -86,11 +86,6 @@ public final class LeaseAssignmentManager { */ private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2; - /** - * Default parallelism factor for scaling lease table. - */ - private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10; - private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease"; /** @@ -689,8 +684,8 @@ public final class LeaseAssignmentManager { } private CompletableFuture, List>> loadLeaseListAsync() { - return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely( - LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR))); + return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> + leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(LEASE_ASSIGNMENT_CALL_THREAD_POOL))); } private T loadWithRetry(final Callable loadFunction) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index fc71621d..08ebf0cb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -150,6 +150,23 @@ public interface LeaseRefresher { */ List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * List all leases from the storage parallely by dynamically calculating total segments and + * deserialize into Lease objects. Returns the list of leaseKey + * that failed deserialize separately. + * + * @param threadPool thread pool to use for parallel scan + * @return Pair of List of leases from the storage and List of items failed to deserialize + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + */ + default Map.Entry, List> listLeasesParallelyWithDynamicTotalSegments( + final ExecutorService threadPool) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throw new UnsupportedOperationException("listLeasesParallelyWithDynamicTotalSegments is not implemented"); + } + /** * List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey * that failed deserialize separately. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 6a50f67a..85602854 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -122,6 +122,16 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION = String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER); + /** + * Default parallelism factor for scaling lease table. + */ + private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10; + + private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024; + private static final double GB_PER_SEGMENT = 0.2; + private static final int MIN_SCAN_SEGMENTS = 1; + private static final int MAX_SCAN_SEGMENTS = 1000000; + private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) { final DdbTableConfig tableConfig = new DdbTableConfig(); tableConfig.billingMode(billingMode); @@ -546,6 +556,24 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return list(null, null); } + @Override + public Map.Entry, List> listLeasesParallelyWithDynamicTotalSegments( + final ExecutorService parallelScanExecutorService) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + + int parallelScanTotalSegment = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR; + DescribeTableResponse describeTableResponse = describeLeaseTable(); + + if (describeTableResponse != null) { + parallelScanTotalSegment = + getParallelScanTotalSegments(describeTableResponse.table().tableSizeBytes()); + } else { + log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegment); + } + + return listLeasesParallely(parallelScanExecutorService, parallelScanTotalSegment); + } + @Override public Map.Entry, List> listLeasesParallely( final ExecutorService parallelScanExecutorService, final int parallelScanTotalSegment) @@ -586,6 +614,35 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize); } + /** + * Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size. + * The calculation follows these rules: + * - Each segment handles 0.2GB (214,748,364 bytes) of data + * - For empty tables or tables smaller than 0.2GB, uses 1 segment + * - Number of segments scales linearly with table size + * + * @return The number of segments to use for parallel scan, minimum 1 + */ + private int getParallelScanTotalSegments(double tableSizeBytes) { + + int totalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR; + + try { + double tableSizeGB = tableSizeBytes / NUMBER_OF_BYTES_PER_GB; + totalSegments = (int) Math.ceil(tableSizeGB / GB_PER_SEGMENT); + } catch (Exception e) { + log.info( + "Error while getting totalSegments so using default totalSegments : {}. Exception : {}", + DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR, + e.getMessage()); + return totalSegments; + } + + totalSegments = Math.min(Math.max(totalSegments, MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); + log.info("TotalSegments for Lease table parallel scan : {}", totalSegments); + return totalSegments; + } + private List> scanSegment(final int segment, final int parallelScanTotalSegment) throws DependencyException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 6bc3a581..1c48ed93 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -46,7 +46,6 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.times; @@ -730,7 +729,8 @@ class LeaseAssignmentManagerTest { final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class); final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class); - when(mockedLeaseRefresher.listLeasesParallely(any(), anyInt())).thenThrow(new RuntimeException()); + when(mockedLeaseRefresher.listLeasesParallelyWithDynamicTotalSegments(any())) + .thenThrow(new RuntimeException()); when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException()); final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( @@ -752,7 +752,7 @@ class LeaseAssignmentManagerTest { leaseAssignmentManagerRunnable.run(); - verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), anyInt()); + verify(mockedLeaseRefresher, times(2)).listLeasesParallelyWithDynamicTotalSegments(any()); verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 4af44b14..20c48ad4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -393,6 +393,34 @@ class DynamoDBLeaseRefresherTest { assertEquals("badLeaseKey", response.getValue().get(0)); } + @Test + void listLeasesParallelyWithDynamicTotalSegments_sanity() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); + leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); + leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2")); + final Map.Entry, List> response = + leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2)); + assertEquals(2, response.getKey().size()); + assertEquals(0, response.getValue().size()); + } + + @Test + void listLeasesParallelyWithDynamicTotalSegments_leaseWithFailingDeserialization_assertCorrectResponse() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); + setupTable(leaseRefresher); + leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); + createAndPutBadLeaseEntryInTable(); + final Map.Entry, List> response = + leaseRefresher.listLeasesParallelyWithDynamicTotalSegments(Executors.newFixedThreadPool(2)); + assertEquals(1, response.getKey().size()); + assertEquals("lease1", response.getKey().get(0).leaseKey()); + assertEquals(1, response.getValue().size()); + assertEquals("badLeaseKey", response.getValue().get(0)); + } + @Test void initiateGracefulLeaseHandoff_sanity() throws Exception { DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);