Expose veryOldLeaseDurationMultiplier in LeaseManagementConfig (#1307)

* Expose veryOldLeaseDurationMultiplier in LeaseManagementConfig as agedFailoverTimeMultiplier
This commit is contained in:
lucienlu-aws 2024-04-10 11:24:53 -07:00 committed by GitHub
parent 7f1f243676
commit 981899499f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 216 additions and 12 deletions

View file

@ -217,6 +217,7 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider dynamoDBCredentialsProvider;
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
private int agedFailoverTimeMultiplier;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
@ -959,6 +960,12 @@ public class KinesisClientLibConfiguration {
return this;
}
public KinesisClientLibConfiguration withAgedFailoverTimeMultiplier(int agedFailoverTimeMultiplier) {
checkIsValuePositive("AgedFailoverTimeMultiplier", agedFailoverTimeMultiplier);
this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
return this;
}
/**
* @param shardSyncIntervalMillis
* Time between tasks to sync leases and Kinesis shards

View file

@ -87,6 +87,8 @@ public class MultiLangDaemonConfiguration {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long failoverTimeMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private int agedFailoverTimeMultiplier;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private boolean cleanupLeasesUponShardCompletion;

View file

@ -90,6 +90,17 @@ public class MultiLangDaemonConfigurationTest {
assertThat(resolvedConfiguration.leaseManagementConfig.maxLeasesForWorker(), equalTo(10));
}
@Test
public void testSetAgedFailoverTimeMultiplier() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setAgedFailoverTimeMultiplier(5);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
.resolvedConfiguration(shardRecordProcessorFactory);
assertThat(resolvedConfiguration.leaseManagementConfig.agedFailoverTimeMultiplier(), equalTo(5));
}
@Test
public void testDefaultRetrievalConfig() {
MultiLangDaemonConfiguration configuration = baseConfiguration();

View file

@ -57,6 +57,7 @@ public class LeaseManagementConfig {
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
public static final int DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER = 3;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
@ -102,6 +103,15 @@ public class LeaseManagementConfig {
*/
private long failoverTimeMillis = 10000L;
/**
* Multiplier for the failoverTimeMillis in which leases which are expired for an extended period of time defined by
* (agedFailoverTimeMultiplier * failoverTimeMillis) are taken with priority, disregarding the target
* but obeying the maximum limit per worker.
*
* <p>Default value: 3 </p>
*/
private int agedFailoverTimeMultiplier = DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER;
/**
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
*
@ -370,6 +380,7 @@ public class LeaseManagementConfig {
workerIdentifier(),
executorService(),
failoverTimeMillis(),
agedFailoverTimeMultiplier(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),

View file

@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.LeaseTaker;
@ -140,6 +141,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* @param metricsFactory
* Used to publish metrics about lease operations
*/
@Deprecated
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
@ -150,11 +152,54 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) {
this(leaseRefresher, workerIdentifier, leaseDurationMillis,
LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}
/**
* Constructor.
*
* @param leaseRefresher
* LeaseRefresher instance to use
* @param workerIdentifier
* Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis
* Duration of a lease
* @param agedFailoverTimeMultiplier
* Multiplier to determine when leases should be taken at priority
* @param epsilonMillis
* Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker
* Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime
* Steal up to these many leases at a time (for load balancing)
* @param initialLeaseTableReadCapacity
* Initial dynamodb lease table read iops if creating the lease table
* @param initialLeaseTableWriteCapacity
* Initial dynamodb lease table write iops if creating the lease table
* @param metricsFactory
* Used to publish metrics about lease operations
*/
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
final int agedFailoverTimeMultiplier,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) {
this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
.withVeryOldLeaseDurationNanosMultiplier(agedFailoverTimeMultiplier);
this.leaseRenewer = new DynamoDBLeaseRenewer(
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);

View file

@ -71,6 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;
private final long failoverTimeMillis;
private final int agedFailoverTimeMultiplier;
private final long epsilonMillis;
private final int maxLeasesForWorker;
private final int maxLeasesToStealAtOneTime;
@ -487,6 +488,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param leaseTableDeletionProtectionEnabled
* @param tags
*/
@Deprecated
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@ -544,6 +546,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param isMultiStreamMode
* @param leaseCleanupConfig
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@ -558,12 +561,74 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
Collection<Tag> tags, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis,
LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
leaseTableDeletionProtectionEnabled, tags, leaseSerializer, customShardDetectorProvider, isMultiStreamMode,
leaseCleanupConfig);
}
/**
* Constructor.
* @param kinesisClient
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param agedFailoverTimeMultiplier
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseTableDeletionProtectionEnabled
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
* @param leaseCleanupConfig
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis,
final int agedFailoverTimeMultiplier, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled,
Collection<Tag> tags, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
@ -596,6 +661,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
workerIdentifier,
failoverTimeMillis,
agedFailoverTimeMultiplier,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,

View file

@ -70,7 +70,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
private int maxLeasesForWorker = Integer.MAX_VALUE;
private int maxLeasesToStealAtOneTime = 1;
private long veryOldLeaseDurationNanosMultiplier = 3;
private int veryOldLeaseDurationNanosMultiplier = 3;
private long lastScanTimeNanos = 0L;
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
@ -103,15 +103,24 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return this;
}
/**
* @deprecated Misspelled method, use {@link DynamoDBLeaseTaker#withVeryOldLeaseDurationNanosMultiplier(int)}
*/
@Deprecated
public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) {
this.veryOldLeaseDurationNanosMultiplier = (int) veryOldLeaseDurationNanosMultipler;
return this;
}
/**
* Overrides the default very old lease duration nanos multiplier to increase the threshold for taking very old leases.
* Setting this to a higher value than 3 will increase the threshold for very old lease taking.
*
* @param veryOldLeaseDurationNanosMultipler Very old lease duration multiplier for adjusting very old lease taking.
* @param veryOldLeaseDurationNanosMultiplier Very old lease duration multiplier for adjusting very old lease taking.
* @return LeaseTaker
*/
public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) {
this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultipler;
public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultiplier(int veryOldLeaseDurationNanosMultiplier) {
this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultiplier;
return this;
}
@ -191,7 +200,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
List<Lease> expiredLeases = getExpiredLeases();
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases, timeProvider);
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
Set<String> untakenLeaseKeys = new HashSet<>();
@ -374,9 +383,11 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @param timeProvider callable which returns the current time in nanos
* @return set of leases to take.
*/
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
@VisibleForTesting
Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timeProvider) throws DependencyException {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
@ -430,7 +441,13 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
final long nanoThreshold = System.nanoTime() - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
long currentNanoTime;
try {
currentNanoTime = timeProvider.call();
} catch (Exception e) {
throw new DependencyException("Exception caught from timeProvider", e);
}
final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
final List<Lease> veryOldLeases = allLeases.values().stream()
.filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
.collect(Collectors.toList());

View file

@ -19,6 +19,7 @@ import static org.mockito.Mockito.when;
public class DynamoDBLeaseCoordinatorTest {
private static final String WORKER_ID = UUID.randomUUID().toString();
private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
@ -39,7 +40,8 @@ public class DynamoDBLeaseCoordinatorTest {
@Before
public void setup() {
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
VERY_OLD_LEASE_DURATION_MULTIPLIER, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}

View file

@ -102,7 +102,7 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
builder.withLease("4", "bar").build();
// setting multiplier to unusually high number to avoid very old lease taking
taker.withVeryOldLeaseDurationNanosMultipler(5000000000L);
taker.withVeryOldLeaseDurationNanosMultiplier(5000000);
builder.takeMutateAssert(taker, 2);
}

View file

@ -22,7 +22,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -46,6 +48,8 @@ public class DynamoDBLeaseTakerTest {
private static final String WORKER_IDENTIFIER = "foo";
private static final long LEASE_DURATION_MILLIS = 1000L;
private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final long MOCK_CURRENT_TIME = 10000000000L;
private DynamoDBLeaseTaker dynamoDBLeaseTaker;
@ -88,7 +92,7 @@ public class DynamoDBLeaseTakerTest {
when(leaseRefresher.listLeases()).thenReturn(leases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);
when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
@ -112,7 +116,7 @@ public class DynamoDBLeaseTakerTest {
when(leaseRefresher.listLeases()).thenReturn(leases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);
when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
@ -121,6 +125,32 @@ public class DynamoDBLeaseTakerTest {
assertEquals(expectedOutput, actualOutput);
}
@Test
public void test_veryOldLeaseDurationNanosMultiplierGetsCorrectLeases() throws Exception {
long veryOldThreshold = MOCK_CURRENT_TIME -
(TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * VERY_OLD_LEASE_DURATION_MULTIPLIER);
DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier =
new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory)
.withVeryOldLeaseDurationNanosMultiplier(VERY_OLD_LEASE_DURATION_MULTIPLIER);
final List<Lease> allLeases = new ImmutableList.Builder<Lease>()
.add(createLease("foo", "2", MOCK_CURRENT_TIME))
.add(createLease("bar", "3", veryOldThreshold - 1))
.add(createLease("baz", "4", veryOldThreshold))
.build();
final List<Lease> expiredLeases = allLeases.subList(1, 3);
dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll(
allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
when(leaseRefresher.listLeases()).thenReturn(allLeases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
Set<Lease> output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider);
final Set<Lease> expectedOutput = new HashSet<>();
expectedOutput.add(allLeases.get(1));
assertEquals(expectedOutput, output);
}
private Lease createLease(String leaseOwner, String leaseKey) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
@ -132,4 +162,17 @@ public class DynamoDBLeaseTakerTest {
lease.leaseKey(leaseKey);
return lease;
}
private Lease createLease(String leaseOwner, String leaseKey, long lastCounterIncrementNanos) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
lease.ownerSwitchesSinceCheckpoint(0L);
lease.leaseCounter(0L);
lease.leaseOwner(leaseOwner);
lease.parentShardIds(Collections.singleton("parentShardId"));
lease.childShardIds(new HashSet<>());
lease.leaseKey(leaseKey);
lease.lastCounterIncrementNanos(lastCounterIncrementNanos);
return lease;
}
}