diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
index bc5d2ac9..12a8fc9c 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java
@@ -217,6 +217,7 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider dynamoDBCredentialsProvider;
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
+ private int agedFailoverTimeMultiplier;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
@@ -959,6 +960,12 @@ public class KinesisClientLibConfiguration {
return this;
}
+ public KinesisClientLibConfiguration withAgedFailoverTimeMultiplier(int agedFailoverTimeMultiplier) {
+ checkIsValuePositive("AgedFailoverTimeMultiplier", agedFailoverTimeMultiplier);
+ this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
+ return this;
+ }
+
/**
* @param shardSyncIntervalMillis
* Time between tasks to sync leases and Kinesis shards
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
index d8f58741..dbaa14f9 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java
@@ -87,6 +87,8 @@ public class MultiLangDaemonConfiguration {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long failoverTimeMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
+ private int agedFailoverTimeMultiplier;
+ @ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private boolean cleanupLeasesUponShardCompletion;
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
index 13489fe7..2bbdde9f 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java
@@ -90,6 +90,17 @@ public class MultiLangDaemonConfigurationTest {
assertThat(resolvedConfiguration.leaseManagementConfig.maxLeasesForWorker(), equalTo(10));
}
+ @Test
+ public void testSetAgedFailoverTimeMultiplier() {
+ MultiLangDaemonConfiguration configuration = baseConfiguration();
+ configuration.setAgedFailoverTimeMultiplier(5);
+
+ MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
+ .resolvedConfiguration(shardRecordProcessorFactory);
+
+ assertThat(resolvedConfiguration.leaseManagementConfig.agedFailoverTimeMultiplier(), equalTo(5));
+ }
+
@Test
public void testDefaultRetrievalConfig() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index 1fecbe7c..58c9f382 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -57,6 +57,7 @@ public class LeaseManagementConfig {
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
+ public static final int DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER = 3;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
@@ -102,6 +103,15 @@ public class LeaseManagementConfig {
*/
private long failoverTimeMillis = 10000L;
+ /**
+ * Multiplier for the failoverTimeMillis in which leases which are expired for an extended period of time defined by
+ * (agedFailoverTimeMultiplier * failoverTimeMillis) are taken with priority, disregarding the target
+ * but obeying the maximum limit per worker.
+ *
+ *
Default value: 3
+ */
+ private int agedFailoverTimeMultiplier = DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER;
+
/**
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
*
@@ -370,6 +380,7 @@ public class LeaseManagementConfig {
workerIdentifier(),
executorService(),
failoverTimeMillis(),
+ agedFailoverTimeMultiplier(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
index da6d8e07..ef2b236f 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
+import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.LeaseTaker;
@@ -140,6 +141,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* @param metricsFactory
* Used to publish metrics about lease operations
*/
+ @Deprecated
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
@@ -150,11 +152,54 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) {
+ this(leaseRefresher, workerIdentifier, leaseDurationMillis,
+ LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
+ maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
+ TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
+ TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param leaseRefresher
+ * LeaseRefresher instance to use
+ * @param workerIdentifier
+ * Identifies the worker (e.g. useful to track lease ownership)
+ * @param leaseDurationMillis
+ * Duration of a lease
+ * @param agedFailoverTimeMultiplier
+ * Multiplier to determine when leases should be taken at priority
+ * @param epsilonMillis
+ * Allow for some variance when calculating lease expirations
+ * @param maxLeasesForWorker
+ * Max leases this Worker can handle at a time
+ * @param maxLeasesToStealAtOneTime
+ * Steal up to these many leases at a time (for load balancing)
+ * @param initialLeaseTableReadCapacity
+ * Initial dynamodb lease table read iops if creating the lease table
+ * @param initialLeaseTableWriteCapacity
+ * Initial dynamodb lease table write iops if creating the lease table
+ * @param metricsFactory
+ * Used to publish metrics about lease operations
+ */
+ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
+ final String workerIdentifier,
+ final long leaseDurationMillis,
+ final int agedFailoverTimeMultiplier,
+ final long epsilonMillis,
+ final int maxLeasesForWorker,
+ final int maxLeasesToStealAtOneTime,
+ final int maxLeaseRenewerThreadCount,
+ final long initialLeaseTableReadCapacity,
+ final long initialLeaseTableWriteCapacity,
+ final MetricsFactory metricsFactory) {
this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
.withMaxLeasesForWorker(maxLeasesForWorker)
- .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
+ .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
+ .withVeryOldLeaseDurationNanosMultiplier(agedFailoverTimeMultiplier);
this.leaseRenewer = new DynamoDBLeaseRenewer(
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index f11e2f0a..e447397b 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -71,6 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private Function customShardDetectorProvider;
private final long failoverTimeMillis;
+ private final int agedFailoverTimeMultiplier;
private final long epsilonMillis;
private final int maxLeasesForWorker;
private final int maxLeasesToStealAtOneTime;
@@ -487,6 +488,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param leaseTableDeletionProtectionEnabled
* @param tags
*/
+ @Deprecated
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@@ -544,6 +546,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param isMultiStreamMode
* @param leaseCleanupConfig
*/
+ @Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@@ -558,12 +561,74 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
Collection tags, LeaseSerializer leaseSerializer,
Function customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
+ this(kinesisClient, dynamoDBClient, tableName,
+ workerIdentifier, executorService, failoverTimeMillis,
+ LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
+ maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
+ maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
+ cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
+ deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode,
+ leaseTableDeletionProtectionEnabled, tags, leaseSerializer, customShardDetectorProvider, isMultiStreamMode,
+ leaseCleanupConfig);
+ }
+
+ /**
+ * Constructor.
+ * @param kinesisClient
+ * @param dynamoDBClient
+ * @param tableName
+ * @param workerIdentifier
+ * @param executorService
+ * @param failoverTimeMillis
+ * @param agedFailoverTimeMultiplier
+ * @param epsilonMillis
+ * @param maxLeasesForWorker
+ * @param maxLeasesToStealAtOneTime
+ * @param maxLeaseRenewalThreads
+ * @param cleanupLeasesUponShardCompletion
+ * @param ignoreUnexpectedChildShards
+ * @param shardSyncIntervalMillis
+ * @param consistentReads
+ * @param listShardsBackoffTimeMillis
+ * @param maxListShardsRetryAttempts
+ * @param maxCacheMissesBeforeReload
+ * @param listShardsCacheAllowedAgeInSeconds
+ * @param cacheMissWarningModulus
+ * @param initialLeaseTableReadCapacity
+ * @param initialLeaseTableWriteCapacity
+ * @param deprecatedHierarchicalShardSyncer
+ * @param tableCreatorCallback
+ * @param dynamoDbRequestTimeout
+ * @param billingMode
+ * @param leaseTableDeletionProtectionEnabled
+ * @param leaseSerializer
+ * @param customShardDetectorProvider
+ * @param isMultiStreamMode
+ * @param leaseCleanupConfig
+ */
+ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
+ final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
+ final ExecutorService executorService, final long failoverTimeMillis,
+ final int agedFailoverTimeMultiplier, final long epsilonMillis,
+ final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
+ final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
+ final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
+ final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
+ final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
+ final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
+ final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
+ Duration dynamoDbRequestTimeout, BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled,
+ Collection tags, LeaseSerializer leaseSerializer,
+ Function customShardDetectorProvider, boolean isMultiStreamMode,
+ LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
+ this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
@@ -596,6 +661,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
workerIdentifier,
failoverTimeMillis,
+ agedFailoverTimeMultiplier,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
index 9fb91f14..e7776738 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
@@ -70,7 +70,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
private int maxLeasesForWorker = Integer.MAX_VALUE;
private int maxLeasesToStealAtOneTime = 1;
- private long veryOldLeaseDurationNanosMultiplier = 3;
+ private int veryOldLeaseDurationNanosMultiplier = 3;
private long lastScanTimeNanos = 0L;
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
@@ -103,15 +103,24 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return this;
}
+ /**
+ * @deprecated Misspelled method, use {@link DynamoDBLeaseTaker#withVeryOldLeaseDurationNanosMultiplier(int)}
+ */
+ @Deprecated
+ public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) {
+ this.veryOldLeaseDurationNanosMultiplier = (int) veryOldLeaseDurationNanosMultipler;
+ return this;
+ }
+
/**
* Overrides the default very old lease duration nanos multiplier to increase the threshold for taking very old leases.
* Setting this to a higher value than 3 will increase the threshold for very old lease taking.
*
- * @param veryOldLeaseDurationNanosMultipler Very old lease duration multiplier for adjusting very old lease taking.
+ * @param veryOldLeaseDurationNanosMultiplier Very old lease duration multiplier for adjusting very old lease taking.
* @return LeaseTaker
*/
- public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) {
- this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultipler;
+ public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultiplier(int veryOldLeaseDurationNanosMultiplier) {
+ this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultiplier;
return this;
}
@@ -191,7 +200,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
List expiredLeases = getExpiredLeases();
- Set leasesToTake = computeLeasesToTake(expiredLeases);
+ Set leasesToTake = computeLeasesToTake(expiredLeases, timeProvider);
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
Set untakenLeaseKeys = new HashSet<>();
@@ -374,9 +383,11 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
+ * @param timeProvider callable which returns the current time in nanos
* @return set of leases to take.
*/
- private Set computeLeasesToTake(List expiredLeases) {
+ @VisibleForTesting
+ Set computeLeasesToTake(List expiredLeases, Callable timeProvider) throws DependencyException {
Map leaseCounts = computeLeaseCounts(expiredLeases);
Set leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
@@ -430,7 +441,13 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
- final long nanoThreshold = System.nanoTime() - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
+ long currentNanoTime;
+ try {
+ currentNanoTime = timeProvider.call();
+ } catch (Exception e) {
+ throw new DependencyException("Exception caught from timeProvider", e);
+ }
+ final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
final List veryOldLeases = allLeases.values().stream()
.filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
.collect(Collectors.toList());
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
index cf1c536b..77b3666c 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java
@@ -19,6 +19,7 @@ import static org.mockito.Mockito.when;
public class DynamoDBLeaseCoordinatorTest {
private static final String WORKER_ID = UUID.randomUUID().toString();
+ private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
@@ -39,7 +40,8 @@ public class DynamoDBLeaseCoordinatorTest {
@Before
public void setup() {
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
- EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
+ VERY_OLD_LEASE_DURATION_MULTIPLIER, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
+ MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
index d6c2d0b3..a6dac253 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java
@@ -102,7 +102,7 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
builder.withLease("4", "bar").build();
// setting multiplier to unusually high number to avoid very old lease taking
- taker.withVeryOldLeaseDurationNanosMultipler(5000000000L);
+ taker.withVeryOldLeaseDurationNanosMultiplier(5000000);
builder.takeMutateAssert(taker, 2);
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java
index b6e74a6b..e3a918ff 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -46,6 +48,8 @@ public class DynamoDBLeaseTakerTest {
private static final String WORKER_IDENTIFIER = "foo";
private static final long LEASE_DURATION_MILLIS = 1000L;
+ private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
+ private static final long MOCK_CURRENT_TIME = 10000000000L;
private DynamoDBLeaseTaker dynamoDBLeaseTaker;
@@ -88,7 +92,7 @@ public class DynamoDBLeaseTakerTest {
when(leaseRefresher.listLeases()).thenReturn(leases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
- when(timeProvider.call()).thenReturn(1000L);
+ when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
@@ -112,7 +116,7 @@ public class DynamoDBLeaseTakerTest {
when(leaseRefresher.listLeases()).thenReturn(leases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
- when(timeProvider.call()).thenReturn(1000L);
+ when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
@@ -121,6 +125,32 @@ public class DynamoDBLeaseTakerTest {
assertEquals(expectedOutput, actualOutput);
}
+ @Test
+ public void test_veryOldLeaseDurationNanosMultiplierGetsCorrectLeases() throws Exception {
+ long veryOldThreshold = MOCK_CURRENT_TIME -
+ (TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * VERY_OLD_LEASE_DURATION_MULTIPLIER);
+ DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier =
+ new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory)
+ .withVeryOldLeaseDurationNanosMultiplier(VERY_OLD_LEASE_DURATION_MULTIPLIER);
+ final List allLeases = new ImmutableList.Builder()
+ .add(createLease("foo", "2", MOCK_CURRENT_TIME))
+ .add(createLease("bar", "3", veryOldThreshold - 1))
+ .add(createLease("baz", "4", veryOldThreshold))
+ .build();
+ final List expiredLeases = allLeases.subList(1, 3);
+
+ dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll(
+ allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
+ when(leaseRefresher.listLeases()).thenReturn(allLeases);
+ when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
+ when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
+
+ Set output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider);
+ final Set expectedOutput = new HashSet<>();
+ expectedOutput.add(allLeases.get(1));
+ assertEquals(expectedOutput, output);
+ }
+
private Lease createLease(String leaseOwner, String leaseKey) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
@@ -132,4 +162,17 @@ public class DynamoDBLeaseTakerTest {
lease.leaseKey(leaseKey);
return lease;
}
+
+ private Lease createLease(String leaseOwner, String leaseKey, long lastCounterIncrementNanos) {
+ final Lease lease = new Lease();
+ lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
+ lease.ownerSwitchesSinceCheckpoint(0L);
+ lease.leaseCounter(0L);
+ lease.leaseOwner(leaseOwner);
+ lease.parentShardIds(Collections.singleton("parentShardId"));
+ lease.childShardIds(new HashSet<>());
+ lease.leaseKey(leaseKey);
+ lease.lastCounterIncrementNanos(lastCounterIncrementNanos);
+ return lease;
+ }
}