diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index d1358be4..d8d9068d 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration { private AwsCredentialsProvider dynamoDBCredentialsProvider; private AwsCredentialsProvider cloudWatchCredentialsProvider; private long failoverTimeMillis; - private boolean doPriorityLeaseTaking; + private boolean enablePriorityLeaseAssignment; private String workerIdentifier; private long shardSyncIntervalMillis; private int maxRecords; @@ -960,8 +960,8 @@ public class KinesisClientLibConfiguration { return this; } - public KinesisClientLibConfiguration withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) { - this.doPriorityLeaseTaking = doPriorityLeaseTaking; + public KinesisClientLibConfiguration withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) { + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; return this; } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index b857b6c3..8b6bc5e6 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -87,7 +87,7 @@ public class MultiLangDaemonConfiguration { @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long failoverTimeMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) - private Boolean doPriorityLeaseTaking; + private Boolean enablePriorityLeaseAssignment; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long shardSyncIntervalMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index 6c419bdc..da18e659 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -91,14 +91,14 @@ public class MultiLangDaemonConfigurationTest { } @Test - public void testSetDoPriorityLeaseTaking() { + public void testSetEnablePriorityLeaseAssignment() { MultiLangDaemonConfiguration configuration = baseConfiguration(); - configuration.setDoPriorityLeaseTaking(Boolean.FALSE); + configuration.setEnablePriorityLeaseAssignment(false); MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration( shardRecordProcessorFactory); - assertThat(resolvedConfiguration.leaseManagementConfig.doPriorityLeaseTaking(), equalTo(false)); + assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false)); } @Test diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 2c01d471..aef4d87e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -57,7 +57,7 @@ public class LeaseManagementConfig { public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis(); public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; - public static final boolean DEFAULT_DO_PRIORITY_LEASE_TAKING = true; + public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true; public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3; @@ -107,11 +107,12 @@ public class LeaseManagementConfig { * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at * priority for a worker which disregards the target leases for the worker but obeys - * {@link LeaseManagementConfig#maxLeasesForWorker} + * {@link LeaseManagementConfig#maxLeasesForWorker}. New leases for new shards due to shard mutation are + * considered to be very expired and taken with priority. * *

Default value: true

*/ - private boolean doPriorityLeaseTaking = DEFAULT_DO_PRIORITY_LEASE_TAKING; + private boolean enablePriorityLeaseAssignment = DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT; /** * Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. @@ -381,7 +382,7 @@ public class LeaseManagementConfig { workerIdentifier(), executorService(), failoverTimeMillis(), - doPriorityLeaseTaking(), + enablePriorityLeaseAssignment(), epsilonMillis(), maxLeasesForWorker(), maxLeasesToStealAtOneTime(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index c7b83d82..647f3c35 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -153,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final long initialLeaseTableWriteCapacity, final MetricsFactory metricsFactory) { this(leaseRefresher, workerIdentifier, leaseDurationMillis, - LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); @@ -168,8 +168,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { * Identifies the worker (e.g. useful to track lease ownership) * @param leaseDurationMillis * Duration of a lease - * @param doPriorityLeaseTaking - * Whether to do priority lease taking for very expired leases + * @param enablePriorityLeaseAssignment + * Whether to do priority lease assignment for very expired leases * @param epsilonMillis * Allow for some variance when calculating lease expirations * @param maxLeasesForWorker @@ -186,7 +186,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final String workerIdentifier, final long leaseDurationMillis, - final boolean doPriorityLeaseTaking, + final boolean enablePriorityLeaseAssignment, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, @@ -199,7 +199,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime) - .withDoPriorityLeaseTaking(doPriorityLeaseTaking); + .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); this.leaseRenewer = new DynamoDBLeaseRenewer( leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 3d4573d0..7d9ebeef 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private Function customShardDetectorProvider; private final long failoverTimeMillis; - private final boolean doPriorityLeaseTaking; + private final boolean enablePriorityLeaseAssignment; private final long epsilonMillis; private final int maxLeasesForWorker; private final int maxLeasesToStealAtOneTime; @@ -563,7 +563,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { LeaseCleanupConfig leaseCleanupConfig) { this(kinesisClient, dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, - LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -581,7 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param workerIdentifier * @param executorService * @param failoverTimeMillis - * @param doPriorityLeaseTaking + * @param enablePriorityLeaseAssignment * @param epsilonMillis * @param maxLeasesForWorker * @param maxLeasesToStealAtOneTime @@ -610,7 +610,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, - final boolean doPriorityLeaseTaking, final long epsilonMillis, + final boolean enablePriorityLeaseAssignment, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -628,7 +628,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.workerIdentifier = workerIdentifier; this.executorService = executorService; this.failoverTimeMillis = failoverTimeMillis; - this.doPriorityLeaseTaking = doPriorityLeaseTaking; + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; this.epsilonMillis = epsilonMillis; this.maxLeasesForWorker = maxLeasesForWorker; this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; @@ -660,8 +660,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(), workerIdentifier, - failoverTimeMillis, - doPriorityLeaseTaking, + failoverTimeMillis, enablePriorityLeaseAssignment, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 76717ae4..7020a94b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; - private boolean doPriorityLeaseTaking = true; + private boolean enablePriorityLeaseAssignment = true; private int veryOldLeaseDurationNanosMultiplier = 3; private long lastScanTimeNanos = 0L; @@ -124,8 +124,8 @@ public class DynamoDBLeaseTaker implements LeaseTaker { return this; } - public DynamoDBLeaseTaker withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) { - this.doPriorityLeaseTaking = doPriorityLeaseTaking; + public DynamoDBLeaseTaker withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) { + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; return this; } @@ -446,7 +446,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // If there are leases that have been expired for an extended period of // time, take them with priority, disregarding the target (computed // later) but obeying the maximum limit per worker. - if (doPriorityLeaseTaking) { + if (enablePriorityLeaseAssignment) { long currentNanoTime; try { currentNanoTime = timeProvider.call(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index bf62a50e..1700460f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -153,28 +153,32 @@ public class DynamoDBLeaseTakerTest { } @Test - public void test_disableDoPriorityLeaseTakingGetsCorrectLeases() throws Exception { + public void test_disableEnablePriorityLeaseAssignmentGetsCorrectLeases() throws Exception { long veryOldThreshold = MOCK_CURRENT_TIME - (TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER); - DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier = + DynamoDBLeaseTaker dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory) - .withDoPriorityLeaseTaking(false); + .withEnablePriorityLeaseAssignment(false); final List allLeases = new ArrayList<>(); - allLeases.add(createLease("foo", "2", MOCK_CURRENT_TIME)); - allLeases.add(createLease("bar", "3", veryOldThreshold - 1)); - allLeases.add(createLease("baz", "4", veryOldThreshold + 1)); - final List expiredLeases = allLeases.subList(1, 3); + allLeases.add(createLease("bar", "2", MOCK_CURRENT_TIME)); + allLeases.add(createLease("bar", "3", MOCK_CURRENT_TIME)); + allLeases.add(createLease("bar", "4", MOCK_CURRENT_TIME)); + allLeases.add(createLease("baz", "5", veryOldThreshold - 1)); + allLeases.add(createLease("baz", "6", veryOldThreshold + 1)); + allLeases.add(createLease(null, "7")); + final List expiredLeases = allLeases.subList(3, 6); - dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll( + dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.allLeases.putAll( allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); when(leaseRefresher.listLeases()).thenReturn(allLeases); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME); - Set output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider); + Set output = dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.computeLeasesToTake(expiredLeases, timeProvider); final Set expectedOutput = new HashSet<>(); - expectedOutput.add(createLease("bar", "3", veryOldThreshold - 1)); - expectedOutput.add(createLease("baz", "4", veryOldThreshold + 1)); + expectedOutput.add(createLease("baz", "5", veryOldThreshold - 1)); + expectedOutput.add(createLease("baz", "6", veryOldThreshold + 1)); + expectedOutput.add(createLease(null, "7")); assertEquals(expectedOutput, output); }