Simplified getParallelScanTotalSegments method

This commit is contained in:
eha sah 2025-03-11 09:59:55 -07:00
parent cad4467c22
commit 4a59562cb7
2 changed files with 17 additions and 19 deletions

View file

@ -136,7 +136,6 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private Integer cachedTotalSegments; private Integer cachedTotalSegments;
private Instant expirationTimeForTotalSegmentsCache; private Instant expirationTimeForTotalSegmentsCache;
private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2); private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2);
private final Object lock = new Object();
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) { private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
final DdbTableConfig tableConfig = new DdbTableConfig(); final DdbTableConfig tableConfig = new DdbTableConfig();
@ -619,12 +618,11 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* *
* @return The number of segments to use for parallel scan, minimum 1 * @return The number of segments to use for parallel scan, minimum 1
*/ */
private int getParallelScanTotalSegments() throws DependencyException { private synchronized int getParallelScanTotalSegments() throws DependencyException {
if (isTotalSegmentsCacheValid()) { if (isTotalSegmentsCacheValid()) {
return cachedTotalSegments; return cachedTotalSegments;
} }
int parallelScanTotalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR; int parallelScanTotalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR;
synchronized(lock) {
if (isTotalSegmentsCacheValid()) { if (isTotalSegmentsCacheValid()) {
return cachedTotalSegments; return cachedTotalSegments;
} }
@ -635,11 +633,11 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB;
parallelScanTotalSegments = Math.min( parallelScanTotalSegments = Math.min(
Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
} }
cachedTotalSegments = parallelScanTotalSegments; cachedTotalSegments = parallelScanTotalSegments;
expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS);
}
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
return parallelScanTotalSegments; return parallelScanTotalSegments;
} }

View file

@ -401,14 +401,14 @@ class DynamoDBLeaseRefresherTest {
public void listLeasesParallely_UseCachedTotalSegment() public void listLeasesParallely_UseCachedTotalSegment()
throws ProvisionedThroughputException, DependencyException, InvalidStateException { throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
final long oneGB_InBytes = 1073741824L; final long oneGBInBytes = 1073741824L;
when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder() .table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE) .tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE) .tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(oneGB_InBytes) .tableSizeBytes(oneGBInBytes)
.build()) .build())
.build())); .build()));
when(mockDdbClient.scan(any(ScanRequest.class))) when(mockDdbClient.scan(any(ScanRequest.class)))