diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 1ce2a033..06808d25 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -136,7 +136,6 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private Integer cachedTotalSegments; private Instant expirationTimeForTotalSegmentsCache; private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2); - private final Object lock = new Object(); private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) { final DdbTableConfig tableConfig = new DdbTableConfig(); @@ -619,27 +618,26 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { * * @return The number of segments to use for parallel scan, minimum 1 */ - private int getParallelScanTotalSegments() throws DependencyException { + private synchronized int getParallelScanTotalSegments() throws DependencyException { if (isTotalSegmentsCacheValid()) { return cachedTotalSegments; } int parallelScanTotalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR; - synchronized(lock) { - if (isTotalSegmentsCacheValid()) { - return cachedTotalSegments; - } - DescribeTableResponse describeTableResponse = describeLeaseTable(); - if (describeTableResponse == null) { - log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments); - } else { - final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; - parallelScanTotalSegments = Math.min( - Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); - } - cachedTotalSegments = parallelScanTotalSegments; - expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); + if (isTotalSegmentsCacheValid()) { + return cachedTotalSegments; } - log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments); + DescribeTableResponse describeTableResponse = describeLeaseTable(); + if (describeTableResponse == null) { + log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments); + } else { + final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; + parallelScanTotalSegments = Math.min( + Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); + + log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments); + } + cachedTotalSegments = parallelScanTotalSegments; + expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); return parallelScanTotalSegments; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 5397e56c..aa3e95d1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -401,14 +401,14 @@ class DynamoDBLeaseRefresherTest { public void listLeasesParallely_UseCachedTotalSegment() throws ProvisionedThroughputException, DependencyException, InvalidStateException { DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); - final long oneGB_InBytes = 1073741824L; + final long oneGBInBytes = 1073741824L; when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() .table(TableDescription.builder() .tableName(TEST_LEASE_TABLE) .tableStatus(TableStatus.ACTIVE) - .tableSizeBytes(oneGB_InBytes) + .tableSizeBytes(oneGBInBytes) .build()) .build())); when(mockDdbClient.scan(any(ScanRequest.class)))