Calculate scan segment by table size (#1443)

* Fixes DDB usage spike issue

* Removed un-necessary exception handling

* made max total segment size 30

* Cached total scan segment

* Reuse listLeasesParallely api to dynamically calculate total segment

* Added unit tests and made getParallelScanTotalSegments synchronous

* Simplified getParallelScanTotalSegments method

* Fall back to previously calculated totalScan

* fixed formating
This commit is contained in:
ehasah-aws 2025-03-12 10:10:19 -07:00 committed by GitHub
parent e856e7c95e
commit 7b5ebaeb93
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 216 additions and 11 deletions

View file

@ -86,11 +86,6 @@ public final class LeaseAssignmentManager {
*/
private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
/**
* Default parallelism factor for scaling lease table.
*/
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
/**
@ -689,8 +684,8 @@ public final class LeaseAssignmentManager {
}
private CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync() {
return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely(
LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR)));
return CompletableFuture.supplyAsync(() ->
loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0)));
}
private <T> T loadWithRetry(final Callable<T> loadFunction) {

View file

@ -154,8 +154,9 @@ public interface LeaseRefresher {
* List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
* that failed deserialize separately.
*
* @param threadPool threadpool to use for parallel scan
* @param parallelismFactor no. of parallel scans
* @param threadPool thread pool to use for parallel scan
* @param parallelismFactor no. of parallel scans.
* If parallelismFactor is 0 then parallelismFactor will be calculated based on table size
* @return Pair of List of leases from the storage and List of items failed to deserialize
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
@ -122,6 +123,20 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION =
String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER);
/**
* Default parallelism factor for scaling lease table.
*/
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024;
private static final double GB_PER_SEGMENT = 0.2;
private static final int MIN_SCAN_SEGMENTS = 1;
private static final int MAX_SCAN_SEGMENTS = 30;
private Integer cachedTotalSegments;
private Instant expirationTimeForTotalSegmentsCache;
private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2);
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
final DdbTableConfig tableConfig = new DdbTableConfig();
tableConfig.billingMode(billingMode);
@ -553,9 +568,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final List<String> leaseItemFailedDeserialize = new ArrayList<>();
final List<Lease> response = new ArrayList<>();
final List<Future<List<Map<String, AttributeValue>>>> futures = new ArrayList<>();
for (int i = 0; i < parallelScanTotalSegment; ++i) {
final int totalSegments;
if (parallelScanTotalSegment > 0) {
totalSegments = parallelScanTotalSegment;
} else {
totalSegments = getParallelScanTotalSegments();
}
for (int i = 0; i < totalSegments; ++i) {
final int segmentNumber = i;
futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment)));
futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments)));
}
try {
for (final Future<List<Map<String, AttributeValue>>> future : futures) {
@ -586,6 +609,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize);
}
/**
* Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size.
* The calculation follows these rules:
* - Each segment handles 0.2GB (214,748,364 bytes) of data
* - For empty tables or tables smaller than 0.2GB, uses 1 segment
* - Number of segments scales linearly with table size
*
* @return The number of segments to use for parallel scan, minimum 1
*/
private synchronized int getParallelScanTotalSegments() throws DependencyException {
if (isTotalSegmentsCacheValid()) {
return cachedTotalSegments;
}
int parallelScanTotalSegments =
cachedTotalSegments == null ? DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR : cachedTotalSegments;
final DescribeTableResponse describeTableResponse = describeLeaseTable();
if (describeTableResponse == null) {
log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments);
} else {
final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB;
parallelScanTotalSegments = Math.min(
Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
}
cachedTotalSegments = parallelScanTotalSegments;
expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS);
return parallelScanTotalSegments;
}
private boolean isTotalSegmentsCacheValid() {
return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache);
}
private List<Map<String, AttributeValue>> scanSegment(final int segment, final int parallelScanTotalSegment)
throws DependencyException {

View file

@ -11,6 +11,8 @@ import java.util.concurrent.Executors;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@ -22,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexDescri
import software.amazon.awssdk.services.dynamodb.model.IndexStatus;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
@ -393,6 +397,153 @@ class DynamoDBLeaseRefresherTest {
assertEquals("badLeaseKey", response.getValue().get(0));
}
@Test
public void listLeasesParallely_UseCachedTotalSegment()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
final long oneGBInBytes = 1073741824L;
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(oneGBInBytes)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
verify(mockDdbClient, times(5)).scan(any(ScanRequest.class));
// calling second to test cached value is used
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
// verify if describe table is called once even when listLeasesParallely is called twice
verify(mockDdbClient, times(1)).describeTable(any(DescribeTableRequest.class));
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
}
@Test
public void listLeasesParallely_DescribeTableNotCalledWhenSegmentGreaterThanZero()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(1000L)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 2);
verify(mockDdbClient, times(0)).describeTable(any(DescribeTableRequest.class));
}
@Test
public void listLeasesParallely_TotalSegmentIsDefaultWhenDescribeTableThrowsException()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenThrow(ResourceNotFoundException.builder()
.message("Mock table does not exist scenario")
.build());
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
}
@ParameterizedTest
@CsvSource({
"0, 1", // 0
"1024, 1", // 1KB
"104857600, 1", // 100MB
"214748364, 1", // 0.2GB
"322122547, 2", // 1.3GB
"1073741824, 5", // 1GB
"2147483648, 10", // 2GB
"5368709120, 25", // 5GB
})
public void listLeasesParallely_TotalSegmentForDifferentTableSize(long tableSizeBytes, int totalSegments)
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(tableSizeBytes)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
verify(mockDdbClient, times(totalSegments)).scan(any(ScanRequest.class));
}
@Test
void initiateGracefulLeaseHandoff_sanity() throws Exception {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);