diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index edd6c2a9..f5ea5470 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -86,11 +86,6 @@ public final class LeaseAssignmentManager { */ private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2; - /** - * Default parallelism factor for scaling lease table. - */ - private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10; - private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease"; /** @@ -689,8 +684,8 @@ public final class LeaseAssignmentManager { } private CompletableFuture, List>> loadLeaseListAsync() { - return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely( - LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR))); + return CompletableFuture.supplyAsync(() -> + loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0))); } private T loadWithRetry(final Callable loadFunction) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index fc71621d..49bec215 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -154,8 +154,9 @@ public interface LeaseRefresher { * List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey * that failed deserialize separately. * - * @param threadPool threadpool to use for parallel scan - * @param parallelismFactor no. of parallel scans + * @param threadPool thread pool to use for parallel scan + * @param parallelismFactor no. of parallel scans. + * If parallelismFactor is 0 then parallelismFactor will be calculated based on table size * @return Pair of List of leases from the storage and List of items failed to deserialize * @throws DependencyException if DynamoDB scan fails in an unexpected way * @throws InvalidStateException if lease table does not exist diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 6a50f67a..7af76661 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; +import java.time.Instant; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; @@ -122,6 +123,20 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION = String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER); + /** + * Default parallelism factor for scaling lease table. + */ + private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10; + + private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024; + private static final double GB_PER_SEGMENT = 0.2; + private static final int MIN_SCAN_SEGMENTS = 1; + private static final int MAX_SCAN_SEGMENTS = 30; + + private Integer cachedTotalSegments; + private Instant expirationTimeForTotalSegmentsCache; + private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2); + private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) { final DdbTableConfig tableConfig = new DdbTableConfig(); tableConfig.billingMode(billingMode); @@ -553,9 +568,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final List leaseItemFailedDeserialize = new ArrayList<>(); final List response = new ArrayList<>(); final List>>> futures = new ArrayList<>(); - for (int i = 0; i < parallelScanTotalSegment; ++i) { + + final int totalSegments; + if (parallelScanTotalSegment > 0) { + totalSegments = parallelScanTotalSegment; + } else { + totalSegments = getParallelScanTotalSegments(); + } + + for (int i = 0; i < totalSegments; ++i) { final int segmentNumber = i; - futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment))); + futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments))); } try { for (final Future>> future : futures) { @@ -586,6 +609,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize); } + /** + * Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size. + * The calculation follows these rules: + * - Each segment handles 0.2GB (214,748,364 bytes) of data + * - For empty tables or tables smaller than 0.2GB, uses 1 segment + * - Number of segments scales linearly with table size + * + * @return The number of segments to use for parallel scan, minimum 1 + */ + private synchronized int getParallelScanTotalSegments() throws DependencyException { + if (isTotalSegmentsCacheValid()) { + return cachedTotalSegments; + } + + int parallelScanTotalSegments = + cachedTotalSegments == null ? DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR : cachedTotalSegments; + final DescribeTableResponse describeTableResponse = describeLeaseTable(); + if (describeTableResponse == null) { + log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments); + } else { + final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; + parallelScanTotalSegments = Math.min( + Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); + + log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments); + } + cachedTotalSegments = parallelScanTotalSegments; + expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); + return parallelScanTotalSegments; + } + + private boolean isTotalSegmentsCacheValid() { + return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache); + } + private List> scanSegment(final int segment, final int parallelScanTotalSegment) throws DependencyException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 4af44b14..aa3e95d1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -11,6 +11,8 @@ import java.util.concurrent.Executors; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mockito; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -22,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexDescri import software.amazon.awssdk.services.dynamodb.model.IndexStatus; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; @@ -393,6 +397,153 @@ class DynamoDBLeaseRefresherTest { assertEquals("badLeaseKey", response.getValue().get(0)); } + @Test + public void listLeasesParallely_UseCachedTotalSegment() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + final long oneGBInBytes = 1073741824L; + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(oneGBInBytes) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(5)).scan(any(ScanRequest.class)); + + // calling second to test cached value is used + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + + // verify if describe table is called once even when listLeasesParallely is called twice + verify(mockDdbClient, times(1)).describeTable(any(DescribeTableRequest.class)); + verify(mockDdbClient, times(10)).scan(any(ScanRequest.class)); + } + + @Test + public void listLeasesParallely_DescribeTableNotCalledWhenSegmentGreaterThanZero() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(1000L) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 2); + verify(mockDdbClient, times(0)).describeTable(any(DescribeTableRequest.class)); + } + + @Test + public void listLeasesParallely_TotalSegmentIsDefaultWhenDescribeTableThrowsException() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenThrow(ResourceNotFoundException.builder() + .message("Mock table does not exist scenario") + .build()); + + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(10)).scan(any(ScanRequest.class)); + } + + @ParameterizedTest + @CsvSource({ + "0, 1", // 0 + "1024, 1", // 1KB + "104857600, 1", // 100MB + "214748364, 1", // 0.2GB + "322122547, 2", // 1.3GB + "1073741824, 5", // 1GB + "2147483648, 10", // 2GB + "5368709120, 25", // 5GB + }) + public void listLeasesParallely_TotalSegmentForDifferentTableSize(long tableSizeBytes, int totalSegments) + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(tableSizeBytes) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(totalSegments)).scan(any(ScanRequest.class)); + } + @Test void initiateGracefulLeaseHandoff_sanity() throws Exception { DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);