diff --git a/CHANGELOG.md b/CHANGELOG.md
index 408712ed..4db944be 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,15 @@ For **1.x** release notes, please see [v1.x/CHANGELOG.md](https://github.com/aws
For **2.x** release notes, please see [v2.x/CHANGELOG.md](https://github.com/awslabs/amazon-kinesis-client/blob/v2.x/CHANGELOG.md)
---
+### Release 3.0.2 (March 12, 2025)
+* [#1443](https://github.com/awslabs/amazon-kinesis-client/pull/1443) Reduce DynamoDB IOPS for smaller stream and worker count applications
+* The below two PRs are intended to support [DynamoDB Streams Kinesis Adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) compatibility
+ * [#1441](https://github.com/awslabs/amazon-kinesis-client/pull/1441) Make consumerTaskFactory overridable by customers
+ * [#1440](https://github.com/awslabs/amazon-kinesis-client/pull/1440) Make ShutdownTask, ProcessTask, InitializeTask, BlockOnParentTask, and ShutdownNotificationTask overridable by customers
+* [#1437](https://github.com/awslabs/amazon-kinesis-client/pull/1437) Suppress LeaseAssignmentManager excessive WARN logs when there is no issue
+* [#1439](https://github.com/awslabs/amazon-kinesis-client/pull/1439) Upgrade io.netty:netty-handler from 4.1.108.Final to 4.1.118.Final
+* [#1400](https://github.com/awslabs/amazon-kinesis-client/pull/1400) Upgrade com.fasterxml.jackson.core:jackson-databind from 2.10.1 to 2.12.7.1
+
### Release 3.0.1 (November 14, 2024)
* [#1401](https://github.com/awslabs/amazon-kinesis-client/pull/1401) Fixed the lease graceful handoff behavior in the multi-stream processing mode
* [#1398](https://github.com/awslabs/amazon-kinesis-client/pull/1398) Addressed several KCL 3.0 related issues raised via GitHub
diff --git a/README.md b/README.md
index cbee9fcc..8be5a1fe 100644
--- a/README.md
+++ b/README.md
@@ -68,7 +68,7 @@ This error occurs due to [a known issue](https://github.com/aws/aws-sdk-java-v2/
software.amazon.kinesis
amazon-kinesis-client
- 3.0.1
+ 3.0.2
```
diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml
index c8a78183..11d6de61 100644
--- a/amazon-kinesis-client-multilang/pom.xml
+++ b/amazon-kinesis-client-multilang/pom.xml
@@ -21,7 +21,7 @@
amazon-kinesis-client-pom
software.amazon.kinesis
- 3.0.2-SNAPSHOT
+ 3.0.2
4.0.0
diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index b54314df..b1efb6cf 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -23,7 +23,7 @@
software.amazon.kinesis
amazon-kinesis-client-pom
- 3.0.2-SNAPSHOT
+ 3.0.2
amazon-kinesis-client
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java
index edd6c2a9..f5ea5470 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java
@@ -86,11 +86,6 @@ public final class LeaseAssignmentManager {
*/
private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
- /**
- * Default parallelism factor for scaling lease table.
- */
- private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
-
private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
/**
@@ -689,8 +684,8 @@ public final class LeaseAssignmentManager {
}
private CompletableFuture, List>> loadLeaseListAsync() {
- return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely(
- LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR)));
+ return CompletableFuture.supplyAsync(() ->
+ loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0)));
}
private T loadWithRetry(final Callable loadFunction) {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
index fc71621d..49bec215 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
@@ -154,8 +154,9 @@ public interface LeaseRefresher {
* List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
* that failed deserialize separately.
*
- * @param threadPool threadpool to use for parallel scan
- * @param parallelismFactor no. of parallel scans
+ * @param threadPool thread pool to use for parallel scan
+ * @param parallelismFactor no. of parallel scans.
+ * If parallelismFactor is 0 then parallelismFactor will be calculated based on table size
* @return Pair of List of leases from the storage and List of items failed to deserialize
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
index 6a50f67a..7af76661 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
@@ -15,6 +15,7 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
+import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
@@ -122,6 +123,20 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION =
String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER);
+ /**
+ * Default parallelism factor for scaling lease table.
+ */
+ private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
+
+ private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024;
+ private static final double GB_PER_SEGMENT = 0.2;
+ private static final int MIN_SCAN_SEGMENTS = 1;
+ private static final int MAX_SCAN_SEGMENTS = 30;
+
+ private Integer cachedTotalSegments;
+ private Instant expirationTimeForTotalSegmentsCache;
+ private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2);
+
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
final DdbTableConfig tableConfig = new DdbTableConfig();
tableConfig.billingMode(billingMode);
@@ -553,9 +568,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final List leaseItemFailedDeserialize = new ArrayList<>();
final List response = new ArrayList<>();
final List>>> futures = new ArrayList<>();
- for (int i = 0; i < parallelScanTotalSegment; ++i) {
+
+ final int totalSegments;
+ if (parallelScanTotalSegment > 0) {
+ totalSegments = parallelScanTotalSegment;
+ } else {
+ totalSegments = getParallelScanTotalSegments();
+ }
+
+ for (int i = 0; i < totalSegments; ++i) {
final int segmentNumber = i;
- futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment)));
+ futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments)));
}
try {
for (final Future>> future : futures) {
@@ -586,6 +609,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize);
}
+ /**
+ * Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size.
+ * The calculation follows these rules:
+ * - Each segment handles 0.2GB (214,748,364 bytes) of data
+ * - For empty tables or tables smaller than 0.2GB, uses 1 segment
+ * - Number of segments scales linearly with table size
+ *
+ * @return The number of segments to use for parallel scan, minimum 1
+ */
+ private synchronized int getParallelScanTotalSegments() throws DependencyException {
+ if (isTotalSegmentsCacheValid()) {
+ return cachedTotalSegments;
+ }
+
+ int parallelScanTotalSegments =
+ cachedTotalSegments == null ? DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR : cachedTotalSegments;
+ final DescribeTableResponse describeTableResponse = describeLeaseTable();
+ if (describeTableResponse == null) {
+ log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments);
+ } else {
+ final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB;
+ parallelScanTotalSegments = Math.min(
+ Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
+
+ log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
+ }
+ cachedTotalSegments = parallelScanTotalSegments;
+ expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS);
+ return parallelScanTotalSegments;
+ }
+
+ private boolean isTotalSegmentsCacheValid() {
+ return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache);
+ }
+
private List