From 7b5ebaeb93ffe8a16f625c952da88a21481c848c Mon Sep 17 00:00:00 2001 From: ehasah-aws Date: Wed, 12 Mar 2025 10:10:19 -0700 Subject: [PATCH 1/2] Calculate scan segment by table size (#1443) * Fixes DDB usage spike issue * Removed un-necessary exception handling * made max total segment size 30 * Cached total scan segment * Reuse listLeasesParallely api to dynamically calculate total segment * Added unit tests and made getParallelScanTotalSegments synchronous * Simplified getParallelScanTotalSegments method * Fall back to previously calculated totalScan * fixed formating --- .../assignment/LeaseAssignmentManager.java | 9 +- .../amazon/kinesis/leases/LeaseRefresher.java | 5 +- .../dynamodb/DynamoDBLeaseRefresher.java | 62 ++++++- .../dynamodb/DynamoDBLeaseRefresherTest.java | 151 ++++++++++++++++++ 4 files changed, 216 insertions(+), 11 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index edd6c2a9..f5ea5470 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -86,11 +86,6 @@ public final class LeaseAssignmentManager { */ private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2; - /** - * Default parallelism factor for scaling lease table. - */ - private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10; - private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease"; /** @@ -689,8 +684,8 @@ public final class LeaseAssignmentManager { } private CompletableFuture, List>> loadLeaseListAsync() { - return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely( - LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR))); + return CompletableFuture.supplyAsync(() -> + loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0))); } private T loadWithRetry(final Callable loadFunction) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index fc71621d..49bec215 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -154,8 +154,9 @@ public interface LeaseRefresher { * List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey * that failed deserialize separately. * - * @param threadPool threadpool to use for parallel scan - * @param parallelismFactor no. of parallel scans + * @param threadPool thread pool to use for parallel scan + * @param parallelismFactor no. of parallel scans. + * If parallelismFactor is 0 then parallelismFactor will be calculated based on table size * @return Pair of List of leases from the storage and List of items failed to deserialize * @throws DependencyException if DynamoDB scan fails in an unexpected way * @throws InvalidStateException if lease table does not exist diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 6a50f67a..7af76661 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; +import java.time.Instant; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; @@ -122,6 +123,20 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION = String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER); + /** + * Default parallelism factor for scaling lease table. + */ + private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10; + + private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024; + private static final double GB_PER_SEGMENT = 0.2; + private static final int MIN_SCAN_SEGMENTS = 1; + private static final int MAX_SCAN_SEGMENTS = 30; + + private Integer cachedTotalSegments; + private Instant expirationTimeForTotalSegmentsCache; + private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2); + private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) { final DdbTableConfig tableConfig = new DdbTableConfig(); tableConfig.billingMode(billingMode); @@ -553,9 +568,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { final List leaseItemFailedDeserialize = new ArrayList<>(); final List response = new ArrayList<>(); final List>>> futures = new ArrayList<>(); - for (int i = 0; i < parallelScanTotalSegment; ++i) { + + final int totalSegments; + if (parallelScanTotalSegment > 0) { + totalSegments = parallelScanTotalSegment; + } else { + totalSegments = getParallelScanTotalSegments(); + } + + for (int i = 0; i < totalSegments; ++i) { final int segmentNumber = i; - futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment))); + futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments))); } try { for (final Future>> future : futures) { @@ -586,6 +609,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize); } + /** + * Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size. + * The calculation follows these rules: + * - Each segment handles 0.2GB (214,748,364 bytes) of data + * - For empty tables or tables smaller than 0.2GB, uses 1 segment + * - Number of segments scales linearly with table size + * + * @return The number of segments to use for parallel scan, minimum 1 + */ + private synchronized int getParallelScanTotalSegments() throws DependencyException { + if (isTotalSegmentsCacheValid()) { + return cachedTotalSegments; + } + + int parallelScanTotalSegments = + cachedTotalSegments == null ? DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR : cachedTotalSegments; + final DescribeTableResponse describeTableResponse = describeLeaseTable(); + if (describeTableResponse == null) { + log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments); + } else { + final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; + parallelScanTotalSegments = Math.min( + Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); + + log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments); + } + cachedTotalSegments = parallelScanTotalSegments; + expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); + return parallelScanTotalSegments; + } + + private boolean isTotalSegmentsCacheValid() { + return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache); + } + private List> scanSegment(final int segment, final int parallelScanTotalSegment) throws DependencyException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 4af44b14..aa3e95d1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -11,6 +11,8 @@ import java.util.concurrent.Executors; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mockito; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -22,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexDescri import software.amazon.awssdk.services.dynamodb.model.IndexStatus; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; @@ -393,6 +397,153 @@ class DynamoDBLeaseRefresherTest { assertEquals("badLeaseKey", response.getValue().get(0)); } + @Test + public void listLeasesParallely_UseCachedTotalSegment() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + final long oneGBInBytes = 1073741824L; + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(oneGBInBytes) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(5)).scan(any(ScanRequest.class)); + + // calling second to test cached value is used + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + + // verify if describe table is called once even when listLeasesParallely is called twice + verify(mockDdbClient, times(1)).describeTable(any(DescribeTableRequest.class)); + verify(mockDdbClient, times(10)).scan(any(ScanRequest.class)); + } + + @Test + public void listLeasesParallely_DescribeTableNotCalledWhenSegmentGreaterThanZero() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(1000L) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 2); + verify(mockDdbClient, times(0)).describeTable(any(DescribeTableRequest.class)); + } + + @Test + public void listLeasesParallely_TotalSegmentIsDefaultWhenDescribeTableThrowsException() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenThrow(ResourceNotFoundException.builder() + .message("Mock table does not exist scenario") + .build()); + + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(10)).scan(any(ScanRequest.class)); + } + + @ParameterizedTest + @CsvSource({ + "0, 1", // 0 + "1024, 1", // 1KB + "104857600, 1", // 100MB + "214748364, 1", // 0.2GB + "322122547, 2", // 1.3GB + "1073741824, 5", // 1GB + "2147483648, 10", // 2GB + "5368709120, 25", // 5GB + }) + public void listLeasesParallely_TotalSegmentForDifferentTableSize(long tableSizeBytes, int totalSegments) + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(tableSizeBytes) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(totalSegments)).scan(any(ScanRequest.class)); + } + @Test void initiateGracefulLeaseHandoff_sanity() throws Exception { DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); From 5263b4227ce7210d52bec6817191d43f047cd1b2 Mon Sep 17 00:00:00 2001 From: lucienlu-aws <132623944+lucienlu-aws@users.noreply.github.com> Date: Wed, 12 Mar 2025 10:22:41 -0700 Subject: [PATCH 2/2] Prepare for release 3.0.2 (#1447) --- CHANGELOG.md | 9 +++++++++ README.md | 2 +- amazon-kinesis-client-multilang/pom.xml | 2 +- amazon-kinesis-client/pom.xml | 2 +- pom.xml | 2 +- 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 408712ed..4db944be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ For **1.x** release notes, please see [v1.x/CHANGELOG.md](https://github.com/aws For **2.x** release notes, please see [v2.x/CHANGELOG.md](https://github.com/awslabs/amazon-kinesis-client/blob/v2.x/CHANGELOG.md) --- +### Release 3.0.2 (March 12, 2025) +* [#1443](https://github.com/awslabs/amazon-kinesis-client/pull/1443) Reduce DynamoDB IOPS for smaller stream and worker count applications +* The below two PRs are intended to support [DynamoDB Streams Kinesis Adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) compatibility + * [#1441](https://github.com/awslabs/amazon-kinesis-client/pull/1441) Make consumerTaskFactory overridable by customers + * [#1440](https://github.com/awslabs/amazon-kinesis-client/pull/1440) Make ShutdownTask, ProcessTask, InitializeTask, BlockOnParentTask, and ShutdownNotificationTask overridable by customers +* [#1437](https://github.com/awslabs/amazon-kinesis-client/pull/1437) Suppress LeaseAssignmentManager excessive WARN logs when there is no issue +* [#1439](https://github.com/awslabs/amazon-kinesis-client/pull/1439) Upgrade io.netty:netty-handler from 4.1.108.Final to 4.1.118.Final +* [#1400](https://github.com/awslabs/amazon-kinesis-client/pull/1400) Upgrade com.fasterxml.jackson.core:jackson-databind from 2.10.1 to 2.12.7.1 + ### Release 3.0.1 (November 14, 2024) * [#1401](https://github.com/awslabs/amazon-kinesis-client/pull/1401) Fixed the lease graceful handoff behavior in the multi-stream processing mode * [#1398](https://github.com/awslabs/amazon-kinesis-client/pull/1398) Addressed several KCL 3.0 related issues raised via GitHub diff --git a/README.md b/README.md index cbee9fcc..8be5a1fe 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ This error occurs due to [a known issue](https://github.com/aws/aws-sdk-java-v2/ software.amazon.kinesis amazon-kinesis-client - 3.0.1 + 3.0.2 ``` diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index c8a78183..11d6de61 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -21,7 +21,7 @@ amazon-kinesis-client-pom software.amazon.kinesis - 3.0.2-SNAPSHOT + 3.0.2 4.0.0 diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index b54314df..b1efb6cf 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -23,7 +23,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 3.0.2-SNAPSHOT + 3.0.2 amazon-kinesis-client diff --git a/pom.xml b/pom.xml index 3add1d15..8f1ef78b 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 3.0.2-SNAPSHOT + 3.0.2 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.