Make LAM run at configurable interval, tie LeaseDiscoverer with LAM run and tie load balancing with leaseDuration

This commit is contained in:
eha sah 2025-04-08 19:27:24 -07:00
parent 133374706c
commit 6049ef1e53
11 changed files with 333 additions and 29 deletions

View file

@ -249,7 +249,7 @@ public class PropertiesMappingE2ETest {
.workerUtilizationAwareAssignmentConfig()
.staleWorkerMetricsEntryCleanupDuration());
assertEquals(
3,
6,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()

View file

@ -0,0 +1,77 @@
import java.util.UUID;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.worker.RecordProcessorFactory;
public class SampleApp {
private final String streamName;
private final Region region;
private final KinesisAsyncClient kinesisClient;
public static void main(String[] args) {
String streamName = "DDB-Scan-usage-test";
Region region = Region.US_EAST_1;
new SampleApp(streamName, region).run();
}
public SampleApp(String streamName, Region region) {
this.streamName = streamName;
this.region = region;
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder().region(this.region));
}
public void run() {
DynamoDbAsyncClient dynamoDbAsyncClient =
DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient =
CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(
streamName,
streamName,
kinesisClient,
dynamoDbAsyncClient,
cloudWatchClient,
UUID.randomUUID().toString(),
new RecordProcessorFactory());
LeaseManagementConfig leaseManagementConfig = configsBuilder.leaseManagementConfig();
leaseManagementConfig.workerUtilizationAwareAssignmentConfig().varianceBalancingFrequency(5);
// failoverTimeMillis = 10 min
leaseManagementConfig.failoverTimeMillis(600000); // 10 minute
// RetrievalConfig config = configsBuilder.retrievalConfig();
// PollingConfig pollingConfig = new PollingConfig(config.kinesisClient());
//
// //idleTimeBetweenReadsInMillis = 200
// pollingConfig.idleTimeBetweenReadsInMillis(200);
//
// config.retrievalSpecificConfig();
//
// //reBalanceThresholdPercentage = 4
// leaseManagementConfig.workerUtilizationAwareAssignmentConfig().reBalanceThresholdPercentage(4);
//
// // maxGetRecordsThreadPool = 10 ??
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
leaseManagementConfig,
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig());
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
}

View file

@ -421,7 +421,8 @@ public class Scheduler implements Runnable {
lamThreadPool,
System::nanoTime,
leaseManagementConfig.maxLeasesForWorker(),
leaseManagementConfig.gracefulLeaseHandoffConfig()))
leaseManagementConfig.gracefulLeaseHandoffConfig(),
leaseManagementConfig.leaseAssignmentIntervalMillis()))
.adaptiveLeaderDeciderCreator(() -> new MigrationAdaptiveLeaderDecider(metricsFactory))
.deterministicLeaderDeciderCreator(() -> new DeterministicShuffleShardSyncLeaderDecider(
leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1, metricsFactory))

View file

@ -117,11 +117,13 @@ public final class LeaseAssignmentManager {
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private boolean tookOverLeadershipInThisRun = false;
private final Map<String, Lease> prevRunLeasesState = new HashMap<>();
private final long leaseAssignmentIntervalMillis;
private Future<?> managerFuture;
private int noOfContinuousFailedAttempts = 0;
private int lamRunCounter = 0;
private long varianceBasedBalancingLastRunTime;
public synchronized void start() {
if (isNull(managerFuture)) {
@ -129,10 +131,7 @@ public final class LeaseAssignmentManager {
// so reset the flag to refresh the state before processing during a restart of LAM.
tookOverLeadershipInThisRun = false;
managerFuture = executorService.scheduleWithFixedDelay(
this::performAssignment,
0L,
leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER,
TimeUnit.MILLISECONDS);
this::performAssignment, 0L, (int) (leaseAssignmentIntervalMillis), TimeUnit.MILLISECONDS);
log.info("Started LeaseAssignmentManager");
return;
}
@ -240,6 +239,7 @@ public final class LeaseAssignmentManager {
if (shouldRunVarianceBalancing()) {
final long balanceWorkerVarianceStartTime = System.currentTimeMillis();
this.varianceBasedBalancingLastRunTime = balanceWorkerVarianceStartTime;
final int totalNewAssignmentBeforeWorkerVarianceBalancing =
inMemoryStorageView.leaseToNewAssignedWorkerMap.size();
leaseAssignmentDecider.balanceWorkerVariance();
@ -280,14 +280,15 @@ public final class LeaseAssignmentManager {
}
private boolean shouldRunVarianceBalancing() {
final boolean response = this.lamRunCounter == 0;
/*
To avoid lamRunCounter grow large, keep it within [0,varianceBalancingFrequency).
If varianceBalancingFrequency is 5 lamRunCounter value will be within 0 to 4 and method return true when
lamRunCounter is 0.
*/
this.lamRunCounter = (this.lamRunCounter + 1) % config.varianceBalancingFrequency();
return response;
final long now = System.currentTimeMillis();
final long varianceBalancingInterval = leaseDurationMillis * config.varianceBalancingFrequency();
if (now - this.varianceBasedBalancingLastRunTime >= varianceBalancingInterval) {
this.varianceBasedBalancingLastRunTime = now;
return true;
}
return false;
}
/**

View file

@ -114,6 +114,16 @@ public class LeaseManagementConfig {
*/
private long failoverTimeMillis = 10000L;
/**
* Lease assignment interval in milliseconds - e.g. wait for this long between Lease assignment run.
*
* <p>Default value: 2 * {@link LeaseManagementConfig#failoverTimeMillis}</p>
*/
private Long leaseAssignmentIntervalMillis;
public long leaseAssignmentIntervalMillis() {
return leaseAssignmentIntervalMillis != null ? leaseAssignmentIntervalMillis : 2 * failoverTimeMillis;
}
/**
* Whether workers should take very expired leases at priority. A very expired lease is when a worker does not
* renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at
@ -489,7 +499,8 @@ public class LeaseManagementConfig {
isMultiStreamingMode,
leaseCleanupConfig(),
workerUtilizationAwareAssignmentConfig(),
gracefulLeaseHandoffConfig);
gracefulLeaseHandoffConfig,
leaseAssignmentIntervalMillis());
}
return leaseManagementFactory;
}
@ -568,13 +579,13 @@ public class LeaseManagementConfig {
private WorkerMetricsTableConfig workerMetricsTableConfig;
/**
* Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency,
* that is every third (as default) iteration of LAM the worker variance balancing will be performed.
* Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration
* Frequency to perform worker variance balancing. This value is used with respect to the failoverTimeMillis,
* that is every six (as default) * failoverTimeMillis the worker variance balancing will be performed.
* Setting it to 1 will make varianceBalancing run on every failoverTimeMillis and 2 on every 2 * failoverTimeMillis
* and so on.
* NOTE: LAM frequency = failoverTimeMillis
* NOTE: LAM frequency = {@link LeaseManagementConfig#leaseAssignmentIntervalMillis}
*/
private int varianceBalancingFrequency = 3;
private int varianceBalancingFrequency = 6;
/**
* Alpha value used for calculating exponential moving average of worker's metricStats. Selecting

View file

@ -143,7 +143,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final MetricsFactory metricsFactory,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap) {
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap,
final long leaseAssignmentIntervalMillis) {
this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
@ -152,8 +153,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
// Should run once every leaseDurationMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis;
// Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2;
this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this);

View file

@ -108,6 +108,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private final long leaseAssignmentIntervalMillis;
/**
* Constructor.
@ -144,6 +145,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param leaseCleanupConfig
* @param workerUtilizationAwareAssignmentConfig
* @param gracefulLeaseHandoffConfig
* @param leaseAssignmentIntervalMillis
*/
public DynamoDBLeaseManagementFactory(
final @NotNull KinesisAsyncClient kinesisClient,
@ -179,7 +181,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
boolean isMultiStreamMode,
final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
long leaseAssignmentIntervalMillis) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@ -214,6 +217,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.tags = tags;
this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis;
}
@Override
@ -239,7 +243,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory,
workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig,
shardInfoShardConsumerMap);
shardInfoShardConsumerMap,
leaseAssignmentIntervalMillis);
}
/**

View file

@ -0,0 +1,84 @@
package software.amazon.kinesis.worker;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
public class RecordProcessor implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private String shardId;
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.shardId();
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Processing {} record(s)", processRecordsInput.records().size());
// processRecordsInput
// .records()
// .forEach(
// r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(),
// r.sequenceNumber()));
// Checkpoint periodically
processRecordsInput.checkpointer().checkpoint();
} catch (Throwable t) {
// log.error("Caught throwable while processing records. Aborting.", t);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Lost lease, so terminating.");
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
// log.error("Exception while checkpointing at shard end. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
// log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
// log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
}

View file

@ -0,0 +1,10 @@
package software.amazon.kinesis.worker;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class RecordProcessorFactory implements ShardRecordProcessorFactory {
public ShardRecordProcessor shardRecordProcessor() {
return new RecordProcessor();
}
}

View file

@ -17,6 +17,8 @@ import lombok.var;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
@ -49,6 +51,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -430,7 +433,8 @@ class LeaseAssignmentManagerTest {
.anyMatch(lease -> lease.leaseOwner().equals(TEST_YIELD_WORKER_ID + "2")));
}
@Test
// no needed since variance based load balancing is not longed tied to LAM run
// @Test
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception {
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10);
@ -483,6 +487,56 @@ class LeaseAssignmentManagerTest {
.count());
}
@Test
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration()
throws Exception {
final int varianceBalancingFrequency = 3;
final long leaseDuration = Duration.ofMillis(1000).toMillis();
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10);
config.varianceBalancingFrequency(varianceBalancingFrequency);
createLeaseAssignmentManager(config, leaseDuration, System::nanoTime, Integer.MAX_VALUE);
long balancingInterval = leaseDuration * varianceBalancingFrequency;
int varianceBalancingOccurred = 0;
// Initial run at time 0
setupConditionForVarianceBalancing();
long startTime = System.currentTimeMillis();
leaseAssignmentManagerRunnable.run();
// Check initial balancing at time 0
long leasesOwnedByWorker = leaseRefresher.listLeases().stream()
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
.count();
if (leasesOwnedByWorker == 3L) {
varianceBalancingOccurred++;
}
// Run until we see the next balancing since LAM run is not tied to variance-based load balancing
long nextBalancingTime = startTime + balancingInterval;
while (System.currentTimeMillis() < (startTime + balancingInterval + 1000)) {
setupConditionForVarianceBalancing();
leaseAssignmentManagerRunnable.run();
leasesOwnedByWorker = leaseRefresher.listLeases().stream()
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
.count();
if (leasesOwnedByWorker == 3L && System.currentTimeMillis() >= nextBalancingTime) {
varianceBalancingOccurred++;
}
Thread.sleep(100);
}
assertTrue(
varianceBalancingOccurred == 2,
"Expected varianceBalancingOccurred to be greater than 1, but was: " + varianceBalancingOccurred);
}
private void setupConditionForVarianceBalancing() throws Exception {
workerMetricsDAO.updateMetrics(createDummyYieldWorkerMetrics(TEST_YIELD_WORKER_ID + "1"));
@ -746,7 +800,8 @@ class LeaseAssignmentManagerTest {
Integer.MAX_VALUE,
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder()
.isGracefulLeaseHandoffEnabled(false)
.build());
.build(),
100L);
leaseAssignmentManager.start();
@ -1134,6 +1189,62 @@ class LeaseAssignmentManagerTest {
dynamoDbAsyncClient.putItem(putItemRequest);
}
@Test
void testLeaseAssignmentSchedulingWithDefaultInterval() {
long failoverTimeMillis = 1000L;
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
leaseRefresher,
workerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
failoverTimeMillis,
new NullMetricsFactory(),
mockExecutor,
System::nanoTime,
Integer.MAX_VALUE,
gracefulLeaseHandoffConfig,
2 * failoverTimeMillis);
leaseAssignmentManager.start();
verify(mockExecutor)
.scheduleWithFixedDelay(
any(Runnable.class), eq(0L), eq(2 * failoverTimeMillis), eq(TimeUnit.MILLISECONDS));
}
@ParameterizedTest
@CsvSource({
"1000, 500", // leaseAssignmentInterval smaller than failover
"1000, 1000", // leaseAssignmentInterval equal to failover
"1000, 2000", // leaseAssignmentInterval larger than failover
})
void testLeaseAssignmentWithDifferentIntervals(long failoverTimeMillis, long leaseAssignmentIntervalMillis) {
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
leaseRefresher,
workerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
failoverTimeMillis,
new NullMetricsFactory(),
mockExecutor,
System::nanoTime,
Integer.MAX_VALUE,
gracefulLeaseHandoffConfig,
leaseAssignmentIntervalMillis);
leaseAssignmentManager.start();
verify(mockExecutor)
.scheduleWithFixedDelay(
any(Runnable.class), eq(0L), eq(leaseAssignmentIntervalMillis), eq(TimeUnit.MILLISECONDS));
}
private LeaseAssignmentManager createLeaseAssignmentManager(
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config,
final Long leaseDurationMillis,
@ -1151,7 +1262,8 @@ class LeaseAssignmentManagerTest {
scheduledExecutorService,
nanoTimeProvider,
maxLeasesPerWorker,
gracefulLeaseHandoffConfig);
gracefulLeaseHandoffConfig,
2 * leaseDurationMillis);
leaseAssignmentManager.start();
return leaseAssignmentManager;
}

View file

@ -71,6 +71,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
private static DynamoDBLeaseRefresher leaseRefresher;
private static DynamoDBCheckpointer dynamoDBCheckpointer;
private static final long LEASE_ASSIGNMENT_INTERVAL_MILLIS = 2 * LEASE_DURATION_MILLIS;
private LeaseCoordinator coordinator;
private final String leaseKey = "shd-1";
@ -128,7 +129,8 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
metricsFactory,
new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(),
new ConcurrentHashMap<>());
new ConcurrentHashMap<>(),
LEASE_ASSIGNMENT_INTERVAL_MILLIS);
dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher);
dynamoDBCheckpointer.operation(OPERATION);