diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 3e60b9cc..6db19b7f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; +import java.time.Instant; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; @@ -132,6 +133,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private static final int MIN_SCAN_SEGMENTS = 1; private static final int MAX_SCAN_SEGMENTS = 30; + private volatile Integer cachedTotalSegments; + private volatile Instant expirationTimeForTotalSegmentsCache; + private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2); + private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) { final DdbTableConfig tableConfig = new DdbTableConfig(); tableConfig.billingMode(billingMode); @@ -560,18 +565,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { public Map.Entry, List> listLeasesParallelyWithDynamicTotalSegments( final ExecutorService parallelScanExecutorService) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - - int parallelScanTotalSegment = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR; - DescribeTableResponse describeTableResponse = describeLeaseTable(); - - if (describeTableResponse != null) { - parallelScanTotalSegment = - getParallelScanTotalSegments(describeTableResponse.table().tableSizeBytes()); - } else { - log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegment); - } - - return listLeasesParallely(parallelScanExecutorService, parallelScanTotalSegment); + return listLeasesParallely(parallelScanExecutorService, getParallelScanTotalSegments()); } @Override @@ -623,13 +617,30 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { * * @return The number of segments to use for parallel scan, minimum 1 */ - private int getParallelScanTotalSegments(double tableSizeBytes) { + private int getParallelScanTotalSegments() throws DependencyException { + if (isTotalSegmentsCacheValid()) { + log.info("Cached value used : TotalSegments for Lease table parallel scan : {}", cachedTotalSegments); + return cachedTotalSegments; + } - double tableSizeGB = tableSizeBytes / NUMBER_OF_BYTES_PER_GB; - int totalSegments = - Math.min(Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); - log.info("TotalSegments for Lease table parallel scan : {}", totalSegments); - return totalSegments; + int parallelScanTotalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR; + DescribeTableResponse describeTableResponse = describeLeaseTable(); + + if (describeTableResponse == null) { + log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments); + } else { + final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; + parallelScanTotalSegments = Math.min( + Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); + log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments); + } + cachedTotalSegments = parallelScanTotalSegments; + expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); + return parallelScanTotalSegments; + } + + private boolean isTotalSegmentsCacheValid() { + return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache); } private List> scanSegment(final int segment, final int parallelScanTotalSegment)