Minor comment updates. Update test. Rename config variable.
This commit is contained in:
parent
c7957d9a86
commit
99794ab2ab
8 changed files with 42 additions and 38 deletions
|
|
@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration {
|
|||
private AwsCredentialsProvider dynamoDBCredentialsProvider;
|
||||
private AwsCredentialsProvider cloudWatchCredentialsProvider;
|
||||
private long failoverTimeMillis;
|
||||
private boolean doPriorityLeaseTaking;
|
||||
private boolean enablePriorityLeaseAssignment;
|
||||
private String workerIdentifier;
|
||||
private long shardSyncIntervalMillis;
|
||||
private int maxRecords;
|
||||
|
|
@ -960,8 +960,8 @@ public class KinesisClientLibConfiguration {
|
|||
return this;
|
||||
}
|
||||
|
||||
public KinesisClientLibConfiguration withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) {
|
||||
this.doPriorityLeaseTaking = doPriorityLeaseTaking;
|
||||
public KinesisClientLibConfiguration withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) {
|
||||
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ public class MultiLangDaemonConfiguration {
|
|||
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
|
||||
private long failoverTimeMillis;
|
||||
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
|
||||
private Boolean doPriorityLeaseTaking;
|
||||
private Boolean enablePriorityLeaseAssignment;
|
||||
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
|
||||
private long shardSyncIntervalMillis;
|
||||
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
|
||||
|
|
|
|||
|
|
@ -91,14 +91,14 @@ public class MultiLangDaemonConfigurationTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSetDoPriorityLeaseTaking() {
|
||||
public void testSetEnablePriorityLeaseAssignment() {
|
||||
MultiLangDaemonConfiguration configuration = baseConfiguration();
|
||||
configuration.setDoPriorityLeaseTaking(Boolean.FALSE);
|
||||
configuration.setEnablePriorityLeaseAssignment(false);
|
||||
|
||||
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration(
|
||||
shardRecordProcessorFactory);
|
||||
|
||||
assertThat(resolvedConfiguration.leaseManagementConfig.doPriorityLeaseTaking(), equalTo(false));
|
||||
assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ public class LeaseManagementConfig {
|
|||
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
|
||||
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
|
||||
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
|
||||
public static final boolean DEFAULT_DO_PRIORITY_LEASE_TAKING = true;
|
||||
public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true;
|
||||
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
|
||||
|
||||
|
||||
|
|
@ -107,11 +107,12 @@ public class LeaseManagementConfig {
|
|||
* Whether workers should take very expired leases at priority. A very expired lease is when a worker does not
|
||||
* renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at
|
||||
* priority for a worker which disregards the target leases for the worker but obeys
|
||||
* {@link LeaseManagementConfig#maxLeasesForWorker}
|
||||
* {@link LeaseManagementConfig#maxLeasesForWorker}. New leases for new shards due to shard mutation are
|
||||
* considered to be very expired and taken with priority.
|
||||
*
|
||||
* <p>Default value: true </p>
|
||||
*/
|
||||
private boolean doPriorityLeaseTaking = DEFAULT_DO_PRIORITY_LEASE_TAKING;
|
||||
private boolean enablePriorityLeaseAssignment = DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT;
|
||||
|
||||
/**
|
||||
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
|
||||
|
|
@ -381,7 +382,7 @@ public class LeaseManagementConfig {
|
|||
workerIdentifier(),
|
||||
executorService(),
|
||||
failoverTimeMillis(),
|
||||
doPriorityLeaseTaking(),
|
||||
enablePriorityLeaseAssignment(),
|
||||
epsilonMillis(),
|
||||
maxLeasesForWorker(),
|
||||
maxLeasesToStealAtOneTime(),
|
||||
|
|
|
|||
|
|
@ -153,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
|||
final long initialLeaseTableWriteCapacity,
|
||||
final MetricsFactory metricsFactory) {
|
||||
this(leaseRefresher, workerIdentifier, leaseDurationMillis,
|
||||
LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker,
|
||||
LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker,
|
||||
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
|
||||
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
|
||||
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
|
||||
|
|
@ -168,8 +168,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
|||
* Identifies the worker (e.g. useful to track lease ownership)
|
||||
* @param leaseDurationMillis
|
||||
* Duration of a lease
|
||||
* @param doPriorityLeaseTaking
|
||||
* Whether to do priority lease taking for very expired leases
|
||||
* @param enablePriorityLeaseAssignment
|
||||
* Whether to do priority lease assignment for very expired leases
|
||||
* @param epsilonMillis
|
||||
* Allow for some variance when calculating lease expirations
|
||||
* @param maxLeasesForWorker
|
||||
|
|
@ -186,7 +186,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
|||
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
|
||||
final String workerIdentifier,
|
||||
final long leaseDurationMillis,
|
||||
final boolean doPriorityLeaseTaking,
|
||||
final boolean enablePriorityLeaseAssignment,
|
||||
final long epsilonMillis,
|
||||
final int maxLeasesForWorker,
|
||||
final int maxLeasesToStealAtOneTime,
|
||||
|
|
@ -199,7 +199,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
|||
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
|
||||
.withMaxLeasesForWorker(maxLeasesForWorker)
|
||||
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
|
||||
.withDoPriorityLeaseTaking(doPriorityLeaseTaking);
|
||||
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
|
||||
this.leaseRenewer = new DynamoDBLeaseRenewer(
|
||||
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
|
||||
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;
|
||||
|
||||
private final long failoverTimeMillis;
|
||||
private final boolean doPriorityLeaseTaking;
|
||||
private final boolean enablePriorityLeaseAssignment;
|
||||
private final long epsilonMillis;
|
||||
private final int maxLeasesForWorker;
|
||||
private final int maxLeasesToStealAtOneTime;
|
||||
|
|
@ -563,7 +563,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
LeaseCleanupConfig leaseCleanupConfig) {
|
||||
this(kinesisClient, dynamoDBClient, tableName,
|
||||
workerIdentifier, executorService, failoverTimeMillis,
|
||||
LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker,
|
||||
LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker,
|
||||
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
|
||||
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
||||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||
|
|
@ -581,7 +581,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
* @param workerIdentifier
|
||||
* @param executorService
|
||||
* @param failoverTimeMillis
|
||||
* @param doPriorityLeaseTaking
|
||||
* @param enablePriorityLeaseAssignment
|
||||
* @param epsilonMillis
|
||||
* @param maxLeasesForWorker
|
||||
* @param maxLeasesToStealAtOneTime
|
||||
|
|
@ -610,7 +610,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
|
||||
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
|
||||
final ExecutorService executorService, final long failoverTimeMillis,
|
||||
final boolean doPriorityLeaseTaking, final long epsilonMillis,
|
||||
final boolean enablePriorityLeaseAssignment, final long epsilonMillis,
|
||||
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
|
||||
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
|
||||
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
|
||||
|
|
@ -628,7 +628,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
this.workerIdentifier = workerIdentifier;
|
||||
this.executorService = executorService;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.doPriorityLeaseTaking = doPriorityLeaseTaking;
|
||||
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
|
||||
this.epsilonMillis = epsilonMillis;
|
||||
this.maxLeasesForWorker = maxLeasesForWorker;
|
||||
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
|
||||
|
|
@ -660,8 +660,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) {
|
||||
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
|
||||
workerIdentifier,
|
||||
failoverTimeMillis,
|
||||
doPriorityLeaseTaking,
|
||||
failoverTimeMillis, enablePriorityLeaseAssignment,
|
||||
epsilonMillis,
|
||||
maxLeasesForWorker,
|
||||
maxLeasesToStealAtOneTime,
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
|||
// TODO: Remove these defaults and use the defaults in the config
|
||||
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
||||
private int maxLeasesToStealAtOneTime = 1;
|
||||
private boolean doPriorityLeaseTaking = true;
|
||||
private boolean enablePriorityLeaseAssignment = true;
|
||||
private int veryOldLeaseDurationNanosMultiplier = 3;
|
||||
private long lastScanTimeNanos = 0L;
|
||||
|
||||
|
|
@ -124,8 +124,8 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
|||
return this;
|
||||
}
|
||||
|
||||
public DynamoDBLeaseTaker withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) {
|
||||
this.doPriorityLeaseTaking = doPriorityLeaseTaking;
|
||||
public DynamoDBLeaseTaker withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) {
|
||||
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -446,7 +446,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
|||
// If there are leases that have been expired for an extended period of
|
||||
// time, take them with priority, disregarding the target (computed
|
||||
// later) but obeying the maximum limit per worker.
|
||||
if (doPriorityLeaseTaking) {
|
||||
if (enablePriorityLeaseAssignment) {
|
||||
long currentNanoTime;
|
||||
try {
|
||||
currentNanoTime = timeProvider.call();
|
||||
|
|
|
|||
|
|
@ -153,28 +153,32 @@ public class DynamoDBLeaseTakerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test_disableDoPriorityLeaseTakingGetsCorrectLeases() throws Exception {
|
||||
public void test_disableEnablePriorityLeaseAssignmentGetsCorrectLeases() throws Exception {
|
||||
long veryOldThreshold = MOCK_CURRENT_TIME -
|
||||
(TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER);
|
||||
DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier =
|
||||
DynamoDBLeaseTaker dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment =
|
||||
new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory)
|
||||
.withDoPriorityLeaseTaking(false);
|
||||
.withEnablePriorityLeaseAssignment(false);
|
||||
final List<Lease> allLeases = new ArrayList<>();
|
||||
allLeases.add(createLease("foo", "2", MOCK_CURRENT_TIME));
|
||||
allLeases.add(createLease("bar", "3", veryOldThreshold - 1));
|
||||
allLeases.add(createLease("baz", "4", veryOldThreshold + 1));
|
||||
final List<Lease> expiredLeases = allLeases.subList(1, 3);
|
||||
allLeases.add(createLease("bar", "2", MOCK_CURRENT_TIME));
|
||||
allLeases.add(createLease("bar", "3", MOCK_CURRENT_TIME));
|
||||
allLeases.add(createLease("bar", "4", MOCK_CURRENT_TIME));
|
||||
allLeases.add(createLease("baz", "5", veryOldThreshold - 1));
|
||||
allLeases.add(createLease("baz", "6", veryOldThreshold + 1));
|
||||
allLeases.add(createLease(null, "7"));
|
||||
final List<Lease> expiredLeases = allLeases.subList(3, 6);
|
||||
|
||||
dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll(
|
||||
dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.allLeases.putAll(
|
||||
allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
|
||||
when(leaseRefresher.listLeases()).thenReturn(allLeases);
|
||||
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
||||
when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);
|
||||
|
||||
Set<Lease> output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider);
|
||||
Set<Lease> output = dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.computeLeasesToTake(expiredLeases, timeProvider);
|
||||
final Set<Lease> expectedOutput = new HashSet<>();
|
||||
expectedOutput.add(createLease("bar", "3", veryOldThreshold - 1));
|
||||
expectedOutput.add(createLease("baz", "4", veryOldThreshold + 1));
|
||||
expectedOutput.add(createLease("baz", "5", veryOldThreshold - 1));
|
||||
expectedOutput.add(createLease("baz", "6", veryOldThreshold + 1));
|
||||
expectedOutput.add(createLease(null, "7"));
|
||||
assertEquals(expectedOutput, output);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue