From 96be30b3e75f44c86883f755f6b2a7fe1d84356f Mon Sep 17 00:00:00 2001 From: lucienlu-aws <132623944+lucienlu-aws@users.noreply.github.com> Date: Mon, 22 Apr 2024 10:35:08 -0700 Subject: [PATCH] Change agedFailoverTimeMultiplier config to doPriorityLeaseTaking (#1317) * Change agedFailoverTimeMultiplier config to doPriorityLeaseTaking --- .../KinesisClientLibConfiguration.java | 7 ++- .../config/MultiLangDaemonConfiguration.java | 2 +- .../MultiLangDaemonConfigurationTest.java | 10 ++-- .../kinesis/leases/LeaseManagementConfig.java | 16 ++++--- .../dynamodb/DynamoDBLeaseCoordinator.java | 10 ++-- .../DynamoDBLeaseManagementFactory.java | 13 +++--- .../leases/dynamodb/DynamoDBLeaseTaker.java | 46 +++++++++++-------- .../DynamoDBLeaseCoordinatorTest.java | 4 +- .../dynamodb/DynamoDBLeaseTakerTest.java | 31 +++++++++++++ 9 files changed, 89 insertions(+), 50 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index 12a8fc9c..d8d9068d 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration { private AwsCredentialsProvider dynamoDBCredentialsProvider; private AwsCredentialsProvider cloudWatchCredentialsProvider; private long failoverTimeMillis; - private int agedFailoverTimeMultiplier; + private boolean enablePriorityLeaseAssignment; private String workerIdentifier; private long shardSyncIntervalMillis; private int maxRecords; @@ -960,9 +960,8 @@ public class KinesisClientLibConfiguration { return this; } - public KinesisClientLibConfiguration withAgedFailoverTimeMultiplier(int agedFailoverTimeMultiplier) { - checkIsValuePositive("AgedFailoverTimeMultiplier", agedFailoverTimeMultiplier); - this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier; + public KinesisClientLibConfiguration withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) { + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; return this; } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index dbaa14f9..8b6bc5e6 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -87,7 +87,7 @@ public class MultiLangDaemonConfiguration { @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long failoverTimeMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) - private int agedFailoverTimeMultiplier; + private Boolean enablePriorityLeaseAssignment; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long shardSyncIntervalMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index 2bbdde9f..da18e659 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -91,14 +91,14 @@ public class MultiLangDaemonConfigurationTest { } @Test - public void testSetAgedFailoverTimeMultiplier() { + public void testSetEnablePriorityLeaseAssignment() { MultiLangDaemonConfiguration configuration = baseConfiguration(); - configuration.setAgedFailoverTimeMultiplier(5); + configuration.setEnablePriorityLeaseAssignment(false); - MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration - .resolvedConfiguration(shardRecordProcessorFactory); + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration( + shardRecordProcessorFactory); - assertThat(resolvedConfiguration.leaseManagementConfig.agedFailoverTimeMultiplier(), equalTo(5)); + assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false)); } @Test diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 58c9f382..aef4d87e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -57,7 +57,7 @@ public class LeaseManagementConfig { public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis(); public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; - public static final int DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER = 3; + public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true; public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3; @@ -104,13 +104,15 @@ public class LeaseManagementConfig { private long failoverTimeMillis = 10000L; /** - * Multiplier for the failoverTimeMillis in which leases which are expired for an extended period of time defined by - * (agedFailoverTimeMultiplier * failoverTimeMillis) are taken with priority, disregarding the target - * but obeying the maximum limit per worker. + * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not + * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at + * priority for a worker which disregards the target leases for the worker but obeys + * {@link LeaseManagementConfig#maxLeasesForWorker}. New leases for new shards due to shard mutation are + * considered to be very expired and taken with priority. * - *

Default value: 3

+ *

Default value: true

*/ - private int agedFailoverTimeMultiplier = DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER; + private boolean enablePriorityLeaseAssignment = DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT; /** * Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. @@ -380,7 +382,7 @@ public class LeaseManagementConfig { workerIdentifier(), executorService(), failoverTimeMillis(), - agedFailoverTimeMultiplier(), + enablePriorityLeaseAssignment(), epsilonMillis(), maxLeasesForWorker(), maxLeasesToStealAtOneTime(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index ef2b236f..6c0803f2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -153,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final long initialLeaseTableWriteCapacity, final MetricsFactory metricsFactory) { this(leaseRefresher, workerIdentifier, leaseDurationMillis, - LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); @@ -168,8 +168,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { * Identifies the worker (e.g. useful to track lease ownership) * @param leaseDurationMillis * Duration of a lease - * @param agedFailoverTimeMultiplier - * Multiplier to determine when leases should be taken at priority + * @param enablePriorityLeaseAssignment + * Whether to enable priority lease assignment for very expired leases * @param epsilonMillis * Allow for some variance when calculating lease expirations * @param maxLeasesForWorker @@ -186,7 +186,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final String workerIdentifier, final long leaseDurationMillis, - final int agedFailoverTimeMultiplier, + final boolean enablePriorityLeaseAssignment, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, @@ -199,7 +199,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime) - .withVeryOldLeaseDurationNanosMultiplier(agedFailoverTimeMultiplier); + .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); this.leaseRenewer = new DynamoDBLeaseRenewer( leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index e447397b..7d9ebeef 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private Function customShardDetectorProvider; private final long failoverTimeMillis; - private final int agedFailoverTimeMultiplier; + private final boolean enablePriorityLeaseAssignment; private final long epsilonMillis; private final int maxLeasesForWorker; private final int maxLeasesToStealAtOneTime; @@ -563,7 +563,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { LeaseCleanupConfig leaseCleanupConfig) { this(kinesisClient, dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, - LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -581,7 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param workerIdentifier * @param executorService * @param failoverTimeMillis - * @param agedFailoverTimeMultiplier + * @param enablePriorityLeaseAssignment * @param epsilonMillis * @param maxLeasesForWorker * @param maxLeasesToStealAtOneTime @@ -610,7 +610,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, - final int agedFailoverTimeMultiplier, final long epsilonMillis, + final boolean enablePriorityLeaseAssignment, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -628,7 +628,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.workerIdentifier = workerIdentifier; this.executorService = executorService; this.failoverTimeMillis = failoverTimeMillis; - this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier; + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; this.epsilonMillis = epsilonMillis; this.maxLeasesForWorker = maxLeasesForWorker; this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; @@ -660,8 +660,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(), workerIdentifier, - failoverTimeMillis, - agedFailoverTimeMultiplier, + failoverTimeMillis, enablePriorityLeaseAssignment, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index e7776738..7020a94b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; - + private boolean enablePriorityLeaseAssignment = true; private int veryOldLeaseDurationNanosMultiplier = 3; private long lastScanTimeNanos = 0L; @@ -124,6 +124,11 @@ public class DynamoDBLeaseTaker implements LeaseTaker { return this; } + public DynamoDBLeaseTaker withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) { + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; + return this; + } + /** * Max leases to steal from a more loaded Worker at one time (for load balancing). * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), @@ -441,25 +446,28 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // If there are leases that have been expired for an extended period of // time, take them with priority, disregarding the target (computed // later) but obeying the maximum limit per worker. - long currentNanoTime; - try { - currentNanoTime = timeProvider.call(); - } catch (Exception e) { - throw new DependencyException("Exception caught from timeProvider", e); - } - final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos); - final List veryOldLeases = allLeases.values().stream() - .filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos()) - .collect(Collectors.toList()); - - if (!veryOldLeases.isEmpty()) { - Collections.shuffle(veryOldLeases); - veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size())); - HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount)); - if (veryOldLeaseCount > 0) { - log.info("Taking leases that have been expired for a long time: {}", result); + if (enablePriorityLeaseAssignment) { + long currentNanoTime; + try { + currentNanoTime = timeProvider.call(); + } catch (Exception e) { + throw new DependencyException("Exception caught from timeProvider", e); + } + final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos); + final List veryOldLeases = allLeases.values() + .stream() + .filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos()) + .collect(Collectors.toList()); + + if (!veryOldLeases.isEmpty()) { + Collections.shuffle(veryOldLeases); + veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size())); + HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount)); + if (veryOldLeaseCount > 0) { + log.info("Taking leases that have been expired for a long time: {}", result); + } + return result; } - return result; } if (numLeasesToReachTarget <= 0) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java index 77b3666c..7347b4cb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java @@ -19,7 +19,7 @@ import static org.mockito.Mockito.when; public class DynamoDBLeaseCoordinatorTest { private static final String WORKER_ID = UUID.randomUUID().toString(); - private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5; + private static final boolean ENABLE_PRIORITY_LEASE_ASSIGNMENT = true; private static final long LEASE_DURATION_MILLIS = 5000L; private static final long EPSILON_MILLIS = 25L; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; @@ -40,7 +40,7 @@ public class DynamoDBLeaseCoordinatorTest { @Before public void setup() { this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, - VERY_OLD_LEASE_DURATION_MULTIPLIER, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, + ENABLE_PRIORITY_LEASE_ASSIGNMENT, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index e3a918ff..1700460f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -48,6 +48,7 @@ public class DynamoDBLeaseTakerTest { private static final String WORKER_IDENTIFIER = "foo"; private static final long LEASE_DURATION_MILLIS = 1000L; + private static final int DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER = 3; private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5; private static final long MOCK_CURRENT_TIME = 10000000000L; @@ -151,6 +152,36 @@ public class DynamoDBLeaseTakerTest { assertEquals(expectedOutput, output); } + @Test + public void test_disableEnablePriorityLeaseAssignmentGetsCorrectLeases() throws Exception { + long veryOldThreshold = MOCK_CURRENT_TIME - + (TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER); + DynamoDBLeaseTaker dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment = + new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory) + .withEnablePriorityLeaseAssignment(false); + final List allLeases = new ArrayList<>(); + allLeases.add(createLease("bar", "2", MOCK_CURRENT_TIME)); + allLeases.add(createLease("bar", "3", MOCK_CURRENT_TIME)); + allLeases.add(createLease("bar", "4", MOCK_CURRENT_TIME)); + allLeases.add(createLease("baz", "5", veryOldThreshold - 1)); + allLeases.add(createLease("baz", "6", veryOldThreshold + 1)); + allLeases.add(createLease(null, "7")); + final List expiredLeases = allLeases.subList(3, 6); + + dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.allLeases.putAll( + allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); + when(leaseRefresher.listLeases()).thenReturn(allLeases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME); + + Set output = dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.computeLeasesToTake(expiredLeases, timeProvider); + final Set expectedOutput = new HashSet<>(); + expectedOutput.add(createLease("baz", "5", veryOldThreshold - 1)); + expectedOutput.add(createLease("baz", "6", veryOldThreshold + 1)); + expectedOutput.add(createLease(null, "7")); + assertEquals(expectedOutput, output); + } + private Lease createLease(String leaseOwner, String leaseKey) { final Lease lease = new Lease(); lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));