diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
index 12a8fc9c..d1358be4 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
@@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider dynamoDBCredentialsProvider;
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
- private int agedFailoverTimeMultiplier;
+ private boolean doPriorityLeaseTaking;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
@@ -960,9 +960,8 @@ public class KinesisClientLibConfiguration {
return this;
}
- public KinesisClientLibConfiguration withAgedFailoverTimeMultiplier(int agedFailoverTimeMultiplier) {
- checkIsValuePositive("AgedFailoverTimeMultiplier", agedFailoverTimeMultiplier);
- this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
+ public KinesisClientLibConfiguration withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) {
+ this.doPriorityLeaseTaking = doPriorityLeaseTaking;
return this;
}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
index dbaa14f9..b857b6c3 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
@@ -87,7 +87,7 @@ public class MultiLangDaemonConfiguration {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long failoverTimeMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
- private int agedFailoverTimeMultiplier;
+ private Boolean doPriorityLeaseTaking;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
index 2bbdde9f..6c419bdc 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
@@ -91,14 +91,14 @@ public class MultiLangDaemonConfigurationTest {
}
@Test
- public void testSetAgedFailoverTimeMultiplier() {
+ public void testSetDoPriorityLeaseTaking() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
- configuration.setAgedFailoverTimeMultiplier(5);
+ configuration.setDoPriorityLeaseTaking(Boolean.FALSE);
- MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
- .resolvedConfiguration(shardRecordProcessorFactory);
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration(
+ shardRecordProcessorFactory);
- assertThat(resolvedConfiguration.leaseManagementConfig.agedFailoverTimeMultiplier(), equalTo(5));
+ assertThat(resolvedConfiguration.leaseManagementConfig.doPriorityLeaseTaking(), equalTo(false));
}
@Test
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index 58c9f382..2c01d471 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -57,7 +57,7 @@ public class LeaseManagementConfig {
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
- public static final int DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER = 3;
+ public static final boolean DEFAULT_DO_PRIORITY_LEASE_TAKING = true;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
@@ -104,13 +104,14 @@ public class LeaseManagementConfig {
private long failoverTimeMillis = 10000L;
/**
- * Multiplier for the failoverTimeMillis in which leases which are expired for an extended period of time defined by
- * (agedFailoverTimeMultiplier * failoverTimeMillis) are taken with priority, disregarding the target
- * but obeying the maximum limit per worker.
+ * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not
+ * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at
+ * priority for a worker which disregards the target leases for the worker but obeys
+ * {@link LeaseManagementConfig#maxLeasesForWorker}
*
- *
Default value: 3
+ * Default value: true
*/
- private int agedFailoverTimeMultiplier = DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER;
+ private boolean doPriorityLeaseTaking = DEFAULT_DO_PRIORITY_LEASE_TAKING;
/**
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
@@ -380,7 +381,7 @@ public class LeaseManagementConfig {
workerIdentifier(),
executorService(),
failoverTimeMillis(),
- agedFailoverTimeMultiplier(),
+ doPriorityLeaseTaking(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
index ef2b236f..c7b83d82 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
@@ -153,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) {
this(leaseRefresher, workerIdentifier, leaseDurationMillis,
- LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
+ LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
@@ -168,8 +168,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis
* Duration of a lease
- * @param agedFailoverTimeMultiplier
- * Multiplier to determine when leases should be taken at priority
+ * @param doPriorityLeaseTaking
+ * Whether to do priority lease taking for very expired leases
* @param epsilonMillis
* Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker
@@ -186,7 +186,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
- final int agedFailoverTimeMultiplier,
+ final boolean doPriorityLeaseTaking,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
@@ -199,7 +199,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
- .withVeryOldLeaseDurationNanosMultiplier(agedFailoverTimeMultiplier);
+ .withDoPriorityLeaseTaking(doPriorityLeaseTaking);
this.leaseRenewer = new DynamoDBLeaseRenewer(
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index e447397b..3d4573d0 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private Function customShardDetectorProvider;
private final long failoverTimeMillis;
- private final int agedFailoverTimeMultiplier;
+ private final boolean doPriorityLeaseTaking;
private final long epsilonMillis;
private final int maxLeasesForWorker;
private final int maxLeasesToStealAtOneTime;
@@ -563,7 +563,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
LeaseCleanupConfig leaseCleanupConfig) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis,
- LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
+ LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@@ -581,7 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
- * @param agedFailoverTimeMultiplier
+ * @param doPriorityLeaseTaking
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
@@ -610,7 +610,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis,
- final int agedFailoverTimeMultiplier, final long epsilonMillis,
+ final boolean doPriorityLeaseTaking, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@@ -628,7 +628,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
- this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
+ this.doPriorityLeaseTaking = doPriorityLeaseTaking;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
@@ -661,7 +661,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
workerIdentifier,
failoverTimeMillis,
- agedFailoverTimeMultiplier,
+ doPriorityLeaseTaking,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
index e7776738..76717ae4 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
@@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// TODO: Remove these defaults and use the defaults in the config
private int maxLeasesForWorker = Integer.MAX_VALUE;
private int maxLeasesToStealAtOneTime = 1;
-
+ private boolean doPriorityLeaseTaking = true;
private int veryOldLeaseDurationNanosMultiplier = 3;
private long lastScanTimeNanos = 0L;
@@ -124,6 +124,11 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return this;
}
+ public DynamoDBLeaseTaker withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) {
+ this.doPriorityLeaseTaking = doPriorityLeaseTaking;
+ return this;
+ }
+
/**
* Max leases to steal from a more loaded Worker at one time (for load balancing).
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
@@ -441,25 +446,28 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
- long currentNanoTime;
- try {
- currentNanoTime = timeProvider.call();
- } catch (Exception e) {
- throw new DependencyException("Exception caught from timeProvider", e);
- }
- final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
- final List veryOldLeases = allLeases.values().stream()
- .filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
- .collect(Collectors.toList());
-
- if (!veryOldLeases.isEmpty()) {
- Collections.shuffle(veryOldLeases);
- veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
- HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
- if (veryOldLeaseCount > 0) {
- log.info("Taking leases that have been expired for a long time: {}", result);
+ if (doPriorityLeaseTaking) {
+ long currentNanoTime;
+ try {
+ currentNanoTime = timeProvider.call();
+ } catch (Exception e) {
+ throw new DependencyException("Exception caught from timeProvider", e);
+ }
+ final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
+ final List veryOldLeases = allLeases.values()
+ .stream()
+ .filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
+ .collect(Collectors.toList());
+
+ if (!veryOldLeases.isEmpty()) {
+ Collections.shuffle(veryOldLeases);
+ veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
+ HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
+ if (veryOldLeaseCount > 0) {
+ log.info("Taking leases that have been expired for a long time: {}", result);
+ }
+ return result;
}
- return result;
}
if (numLeasesToReachTarget <= 0) {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
index 77b3666c..97914bff 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
@@ -19,7 +19,7 @@ import static org.mockito.Mockito.when;
public class DynamoDBLeaseCoordinatorTest {
private static final String WORKER_ID = UUID.randomUUID().toString();
- private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
+ private static final boolean DO_PRIORITY_LEASE_TAKING = true;
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
@@ -40,7 +40,7 @@ public class DynamoDBLeaseCoordinatorTest {
@Before
public void setup() {
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
- VERY_OLD_LEASE_DURATION_MULTIPLIER, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
+ DO_PRIORITY_LEASE_TAKING, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java
index e3a918ff..bf62a50e 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java
@@ -48,6 +48,7 @@ public class DynamoDBLeaseTakerTest {
private static final String WORKER_IDENTIFIER = "foo";
private static final long LEASE_DURATION_MILLIS = 1000L;
+ private static final int DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER = 3;
private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final long MOCK_CURRENT_TIME = 10000000000L;
@@ -151,6 +152,32 @@ public class DynamoDBLeaseTakerTest {
assertEquals(expectedOutput, output);
}
+ @Test
+ public void test_disableDoPriorityLeaseTakingGetsCorrectLeases() throws Exception {
+ long veryOldThreshold = MOCK_CURRENT_TIME -
+ (TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER);
+ DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier =
+ new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory)
+ .withDoPriorityLeaseTaking(false);
+ final List allLeases = new ArrayList<>();
+ allLeases.add(createLease("foo", "2", MOCK_CURRENT_TIME));
+ allLeases.add(createLease("bar", "3", veryOldThreshold - 1));
+ allLeases.add(createLease("baz", "4", veryOldThreshold + 1));
+ final List expiredLeases = allLeases.subList(1, 3);
+
+ dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll(
+ allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
+ when(leaseRefresher.listLeases()).thenReturn(allLeases);
+ when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
+ when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
+
+ Set output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider);
+ final Set expectedOutput = new HashSet<>();
+ expectedOutput.add(createLease("bar", "3", veryOldThreshold - 1));
+ expectedOutput.add(createLease("baz", "4", veryOldThreshold + 1));
+ assertEquals(expectedOutput, output);
+ }
+
private Lease createLease(String leaseOwner, String leaseKey) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));