diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index c651af91..1ce2a033 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -133,9 +133,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private static final int MIN_SCAN_SEGMENTS = 1; private static final int MAX_SCAN_SEGMENTS = 30; - private volatile Integer cachedTotalSegments; - private volatile Instant expirationTimeForTotalSegmentsCache; + private Integer cachedTotalSegments; + private Instant expirationTimeForTotalSegmentsCache; private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2); + private final Object lock = new Object(); private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) { final DdbTableConfig tableConfig = new DdbTableConfig(); @@ -620,23 +621,25 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { */ private int getParallelScanTotalSegments() throws DependencyException { if (isTotalSegmentsCacheValid()) { - log.info("Cached value used : TotalSegments for Lease table parallel scan : {}", cachedTotalSegments); return cachedTotalSegments; } - int parallelScanTotalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR; - DescribeTableResponse describeTableResponse = describeLeaseTable(); - - if (describeTableResponse == null) { - log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments); - } else { - final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; - parallelScanTotalSegments = Math.min( - Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); - log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments); + synchronized(lock) { + if (isTotalSegmentsCacheValid()) { + return cachedTotalSegments; + } + DescribeTableResponse describeTableResponse = describeLeaseTable(); + if (describeTableResponse == null) { + log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments); + } else { + final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB; + parallelScanTotalSegments = Math.min( + Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS); + } + cachedTotalSegments = parallelScanTotalSegments; + expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); } - cachedTotalSegments = parallelScanTotalSegments; - expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS); + log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments); return parallelScanTotalSegments; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 21c54e75..6bc3a581 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -44,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -757,39 +756,6 @@ class LeaseAssignmentManagerTest { verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats(); } - @Test - void performAssignment_testRetryBehavior_withDynamicTotalScanSegments() - throws ProvisionedThroughputException, InvalidStateException, DependencyException { - - final WorkerMetricStatsDAO mockedWorkerMetricsDAO = Mockito.mock(WorkerMetricStatsDAO.class); - final LeaseRefresher mockedLeaseRefresher = Mockito.mock(LeaseRefresher.class); - - when(mockedLeaseRefresher.listLeasesParallely(any(), eq(0))).thenThrow(new RuntimeException()); - when(mockedWorkerMetricsDAO.getAllWorkerMetricStats()).thenThrow(new RuntimeException()); - - final LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( - mockedLeaseRefresher, - mockedWorkerMetricsDAO, - mockLeaderDecider, - getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20), - TEST_LEADER_WORKER_ID, - 100L, - new NullMetricsFactory(), - scheduledExecutorService, - System::nanoTime, - Integer.MAX_VALUE, - LeaseManagementConfig.GracefulLeaseHandoffConfig.builder() - .isGracefulLeaseHandoffEnabled(false) - .build()); - - leaseAssignmentManager.start(); - - leaseAssignmentManagerRunnable.run(); - - verify(mockedLeaseRefresher, times(2)).listLeasesParallely(any(), eq(0)); - verify(mockedWorkerMetricsDAO, times(2)).getAllWorkerMetricStats(); - } - @Test void performAssignment_invalidLeaseInTable_validateAssignmentDoesNotFail() throws Exception { createLeaseAssignmentManager( diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 218d9e54..5397e56c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -11,6 +11,8 @@ import java.util.concurrent.Executors; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mockito; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -22,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexDescri import software.amazon.awssdk.services.dynamodb.model.IndexStatus; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; @@ -394,31 +398,150 @@ class DynamoDBLeaseRefresherTest { } @Test - void listLeasesParallelyWithDynamicTotalSegments_sanity() + public void listLeasesParallely_UseCachedTotalSegment() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); - setupTable(leaseRefresher); - leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); - leaseRefresher.createLeaseIfNotExists(createDummyLease("lease2", "leaseOwner2")); - final Map.Entry, List> response = - leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); - assertEquals(2, response.getKey().size()); - assertEquals(0, response.getValue().size()); + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + final long oneGB_InBytes = 1073741824L; + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(oneGB_InBytes) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(5)).scan(any(ScanRequest.class)); + + // calling second to test cached value is used + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + + // verify if describe table is called once even when listLeasesParallely is called twice + verify(mockDdbClient, times(1)).describeTable(any(DescribeTableRequest.class)); + verify(mockDdbClient, times(10)).scan(any(ScanRequest.class)); } @Test - void listLeasesParallelyWithDynamicTotalSegments_leaseWithFailingDeserialization_assertCorrectResponse() + public void listLeasesParallely_DescribeTableNotCalledWhenSegmentGreaterThanZero() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient); - setupTable(leaseRefresher); - leaseRefresher.createLeaseIfNotExists(createDummyLease("lease1", "leaseOwner1")); - createAndPutBadLeaseEntryInTable(); - final Map.Entry, List> response = - leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); - assertEquals(1, response.getKey().size()); - assertEquals("lease1", response.getKey().get(0).leaseKey()); - assertEquals(1, response.getValue().size()); - assertEquals("badLeaseKey", response.getValue().get(0)); + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(1000L) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 2); + verify(mockDdbClient, times(0)).describeTable(any(DescribeTableRequest.class)); + } + + @Test + public void listLeasesParallely_TotalSegmentIsDefaultWhenDescribeTableThrowsException() + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenThrow(ResourceNotFoundException.builder() + .message("Mock table does not exist scenario") + .build()); + + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(10)).scan(any(ScanRequest.class)); + } + + @ParameterizedTest + @CsvSource({ + "0, 1", // 0 + "1024, 1", // 1KB + "104857600, 1", // 100MB + "214748364, 1", // 0.2GB + "322122547, 2", // 1.3GB + "1073741824, 5", // 1GB + "2147483648, 10", // 2GB + "5368709120, 25", // 5GB + }) + public void listLeasesParallely_TotalSegmentForDifferentTableSize(long tableSizeBytes, int totalSegments) + throws ProvisionedThroughputException, DependencyException, InvalidStateException { + DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class); + + when(mockDdbClient.describeTable(any(DescribeTableRequest.class))) + .thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder() + .table(TableDescription.builder() + .tableName(TEST_LEASE_TABLE) + .tableStatus(TableStatus.ACTIVE) + .tableSizeBytes(tableSizeBytes) + .build()) + .build())); + when(mockDdbClient.scan(any(ScanRequest.class))) + .thenReturn(CompletableFuture.completedFuture( + ScanResponse.builder().items(new ArrayList<>()).build())); + + final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher( + TEST_LEASE_TABLE, + mockDdbClient, + new DynamoDBLeaseSerializer(), + true, + NOOP_TABLE_CREATOR_CALLBACK, + Duration.ofSeconds(10), + new DdbTableConfig(), + true, + true, + new ArrayList<>()); + + leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0); + verify(mockDdbClient, times(totalSegments)).scan(any(ScanRequest.class)); } @Test