Change agedFailoverTimeMultiplier config to doPriorityLeaseTaking

This commit is contained in:
Lucien Luc 2024-04-19 12:08:43 -07:00
parent e9990190cc
commit c7957d9a86
9 changed files with 84 additions and 49 deletions

View file

@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider dynamoDBCredentialsProvider;
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
private int agedFailoverTimeMultiplier;
private boolean doPriorityLeaseTaking;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
@ -960,9 +960,8 @@ public class KinesisClientLibConfiguration {
return this;
}
public KinesisClientLibConfiguration withAgedFailoverTimeMultiplier(int agedFailoverTimeMultiplier) {
checkIsValuePositive("AgedFailoverTimeMultiplier", agedFailoverTimeMultiplier);
this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
public KinesisClientLibConfiguration withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) {
this.doPriorityLeaseTaking = doPriorityLeaseTaking;
return this;
}

View file

@ -87,7 +87,7 @@ public class MultiLangDaemonConfiguration {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long failoverTimeMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private int agedFailoverTimeMultiplier;
private Boolean doPriorityLeaseTaking;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)

View file

@ -91,14 +91,14 @@ public class MultiLangDaemonConfigurationTest {
}
@Test
public void testSetAgedFailoverTimeMultiplier() {
public void testSetDoPriorityLeaseTaking() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setAgedFailoverTimeMultiplier(5);
configuration.setDoPriorityLeaseTaking(Boolean.FALSE);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
.resolvedConfiguration(shardRecordProcessorFactory);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration(
shardRecordProcessorFactory);
assertThat(resolvedConfiguration.leaseManagementConfig.agedFailoverTimeMultiplier(), equalTo(5));
assertThat(resolvedConfiguration.leaseManagementConfig.doPriorityLeaseTaking(), equalTo(false));
}
@Test

View file

@ -57,7 +57,7 @@ public class LeaseManagementConfig {
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
public static final int DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER = 3;
public static final boolean DEFAULT_DO_PRIORITY_LEASE_TAKING = true;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
@ -104,13 +104,14 @@ public class LeaseManagementConfig {
private long failoverTimeMillis = 10000L;
/**
* Multiplier for the failoverTimeMillis in which leases which are expired for an extended period of time defined by
* (agedFailoverTimeMultiplier * failoverTimeMillis) are taken with priority, disregarding the target
* but obeying the maximum limit per worker.
* Whether workers should take very expired leases at priority. A very expired lease is when a worker does not
* renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at
* priority for a worker which disregards the target leases for the worker but obeys
* {@link LeaseManagementConfig#maxLeasesForWorker}
*
* <p>Default value: 3 </p>
* <p>Default value: true </p>
*/
private int agedFailoverTimeMultiplier = DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER;
private boolean doPriorityLeaseTaking = DEFAULT_DO_PRIORITY_LEASE_TAKING;
/**
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
@ -380,7 +381,7 @@ public class LeaseManagementConfig {
workerIdentifier(),
executorService(),
failoverTimeMillis(),
agedFailoverTimeMultiplier(),
doPriorityLeaseTaking(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),

View file

@ -153,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) {
this(leaseRefresher, workerIdentifier, leaseDurationMillis,
LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
@ -168,8 +168,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis
* Duration of a lease
* @param agedFailoverTimeMultiplier
* Multiplier to determine when leases should be taken at priority
* @param doPriorityLeaseTaking
* Whether to do priority lease taking for very expired leases
* @param epsilonMillis
* Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker
@ -186,7 +186,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
final int agedFailoverTimeMultiplier,
final boolean doPriorityLeaseTaking,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
@ -199,7 +199,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
.withVeryOldLeaseDurationNanosMultiplier(agedFailoverTimeMultiplier);
.withDoPriorityLeaseTaking(doPriorityLeaseTaking);
this.leaseRenewer = new DynamoDBLeaseRenewer(
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);

View file

@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;
private final long failoverTimeMillis;
private final int agedFailoverTimeMultiplier;
private final boolean doPriorityLeaseTaking;
private final long epsilonMillis;
private final int maxLeasesForWorker;
private final int maxLeasesToStealAtOneTime;
@ -563,7 +563,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
LeaseCleanupConfig leaseCleanupConfig) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis,
LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@ -581,7 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param agedFailoverTimeMultiplier
* @param doPriorityLeaseTaking
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
@ -610,7 +610,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis,
final int agedFailoverTimeMultiplier, final long epsilonMillis,
final boolean doPriorityLeaseTaking, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@ -628,7 +628,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
this.doPriorityLeaseTaking = doPriorityLeaseTaking;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
@ -661,7 +661,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
workerIdentifier,
failoverTimeMillis,
agedFailoverTimeMultiplier,
doPriorityLeaseTaking,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,

View file

@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// TODO: Remove these defaults and use the defaults in the config
private int maxLeasesForWorker = Integer.MAX_VALUE;
private int maxLeasesToStealAtOneTime = 1;
private boolean doPriorityLeaseTaking = true;
private int veryOldLeaseDurationNanosMultiplier = 3;
private long lastScanTimeNanos = 0L;
@ -124,6 +124,11 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return this;
}
public DynamoDBLeaseTaker withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) {
this.doPriorityLeaseTaking = doPriorityLeaseTaking;
return this;
}
/**
* Max leases to steal from a more loaded Worker at one time (for load balancing).
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
@ -441,6 +446,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
if (doPriorityLeaseTaking) {
long currentNanoTime;
try {
currentNanoTime = timeProvider.call();
@ -448,7 +454,8 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
throw new DependencyException("Exception caught from timeProvider", e);
}
final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
final List<Lease> veryOldLeases = allLeases.values().stream()
final List<Lease> veryOldLeases = allLeases.values()
.stream()
.filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
.collect(Collectors.toList());
@ -461,6 +468,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
}
return result;
}
}
if (numLeasesToReachTarget <= 0) {
// If we don't need anything, return the empty set.

View file

@ -19,7 +19,7 @@ import static org.mockito.Mockito.when;
public class DynamoDBLeaseCoordinatorTest {
private static final String WORKER_ID = UUID.randomUUID().toString();
private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final boolean DO_PRIORITY_LEASE_TAKING = true;
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
@ -40,7 +40,7 @@ public class DynamoDBLeaseCoordinatorTest {
@Before
public void setup() {
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
VERY_OLD_LEASE_DURATION_MULTIPLIER, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
DO_PRIORITY_LEASE_TAKING, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}

View file

@ -48,6 +48,7 @@ public class DynamoDBLeaseTakerTest {
private static final String WORKER_IDENTIFIER = "foo";
private static final long LEASE_DURATION_MILLIS = 1000L;
private static final int DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER = 3;
private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final long MOCK_CURRENT_TIME = 10000000000L;
@ -151,6 +152,32 @@ public class DynamoDBLeaseTakerTest {
assertEquals(expectedOutput, output);
}
@Test
public void test_disableDoPriorityLeaseTakingGetsCorrectLeases() throws Exception {
long veryOldThreshold = MOCK_CURRENT_TIME -
(TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER);
DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier =
new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory)
.withDoPriorityLeaseTaking(false);
final List<Lease> allLeases = new ArrayList<>();
allLeases.add(createLease("foo", "2", MOCK_CURRENT_TIME));
allLeases.add(createLease("bar", "3", veryOldThreshold - 1));
allLeases.add(createLease("baz", "4", veryOldThreshold + 1));
final List<Lease> expiredLeases = allLeases.subList(1, 3);
dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll(
allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
when(leaseRefresher.listLeases()).thenReturn(allLeases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
Set<Lease> output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider);
final Set<Lease> expectedOutput = new HashSet<>();
expectedOutput.add(createLease("bar", "3", veryOldThreshold - 1));
expectedOutput.add(createLease("baz", "4", veryOldThreshold + 1));
assertEquals(expectedOutput, output);
}
private Lease createLease(String leaseOwner, String leaseKey) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));