Treat unassigned leases (as well as expired ones) as available-to-be-taken

If a lease is 'unassigned' (it has no lease owner) then it should be
considered available for taking in `DynamoDBLeaseTaker`. Prior to this
change, the only ways `DynamoDBLeaseTaker` could take leases for a
scheduler was either by incremental lease stealing, or waiting for the
lease to expire by not having been updated in `failoverTimeMillis` - which
could be slow if `failoverTimeMillis` was set reasonably high (with it set
to just 30s, I've seen new instances take over 3 minutes to take all leases
from old instances in a deployment).

This would be one half of the a fix for
https://github.com/awslabs/amazon-kinesis-client/issues/845 - the other
half of the fix is invoking `evictLease()` (setting the lease owner to
null) on graceful shutdown of a scheduler.
This commit is contained in:
Roberto Tyley 2021-08-08 10:44:22 +01:00 committed by Roberto Tyley
parent fda91d9070
commit f9d50772d5
3 changed files with 58 additions and 62 deletions

View file

@ -195,6 +195,15 @@ public class Lease {
} }
} }
/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
* @return true if lease is available (unassigned or expired as-of given time), false otherwise
*/
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
}
/** /**
* Sets lastCounterIncrementNanos * Sets lastCounterIncrementNanos
* *
@ -310,6 +319,10 @@ public class Lease {
this.leaseOwner = leaseOwner; this.leaseOwner = leaseOwner;
} }
public boolean isUnassigned() {
return leaseOwner == null;
}
/** /**
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics. * Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
* *

View file

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -44,6 +45,10 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis; import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.summingInt;
import static java.util.stream.Collectors.toList;
/** /**
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
*/ */
@ -191,9 +196,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return takenLeases; return takenLeases;
} }
List<Lease> expiredLeases = getExpiredLeases(); Set<Lease> leasesToTake = computeLeasesToTake(getAvailableLeases());
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake); leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
Set<String> untakenLeaseKeys = new HashSet<>(); Set<String> untakenLeaseKeys = new HashSet<>();
@ -357,50 +360,39 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
} }
/** /**
* @return list of leases that were expired as of our last scan. * @return list of leases that were available (unassigned or expired) as of our last scan.
*/ */
private List<Lease> getExpiredLeases() { private List<Lease> getAvailableLeases() {
List<Lease> expiredLeases = new ArrayList<>(); return allLeases.values().stream().filter(
lease -> lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)
for (Lease lease : allLeases.values()) { ).collect(toList());
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
}
}
return expiredLeases;
} }
/** /**
* Compute the number of leases I should try to take based on the state of the system. * Compute the number of leases I should try to take based on the state of the system.
* *
* @param expiredLeases list of leases we determined to be expired * @param availableLeases list of leases we determined to be available (unassigned or expired) * @return set of leases to take.
* @return set of leases to take.
*/ */
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) { private Set<Lease> computeLeasesToTake(List<Lease> availableLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases); Map<String, Integer> leaseCountPerWorker = computeActiveLeaseCountsByWorker(availableLeases);
Set<Lease> leasesToTake = new HashSet<>(); Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>(); List<Lease> veryOldLeases = new ArrayList<>();
final int numAvailableLeases = expiredLeases.size(); final int numAvailableLeases = availableLeases.size();
int numLeases = 0; final int numLeases = allLeases.size();
int numWorkers = 0; final int numWorkers = leaseCountPerWorker.size();
int numLeasesToReachTarget = 0; int numLeasesToReachTarget = 0;
int leaseSpillover = 0; int leaseSpillover = 0;
int veryOldLeaseCount = 0; int veryOldLeaseCount = 0;
try { try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();
if (numLeases == 0) { if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any. // If there are no leases, I shouldn't try to take any.
return leasesToTake; return leasesToTake;
} }
int target; int target;
if (numWorkers >= numLeases) { if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself. // If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
@ -426,17 +418,17 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
} }
} }
int myCount = leaseCounts.get(workerIdentifier); int myCount = leaseCountPerWorker.get(workerIdentifier);
numLeasesToReachTarget = target - myCount; numLeasesToReachTarget = target - myCount;
int currentLeaseCount = leaseCounts.get(workerIdentifier); int currentLeaseCount = leaseCountPerWorker.get(workerIdentifier);
// If there are leases that have been expired for an extended period of // If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed // time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker. // later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream() veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos() .filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos) > veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
.collect(Collectors.toList()); .collect(toList());
if (!veryOldLeases.isEmpty()) { if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases); Collections.shuffle(veryOldLeases);
@ -453,17 +445,17 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return leasesToTake; return leasesToTake;
} }
// Shuffle expiredLeases so workers don't all try to contend for the same leases. // Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases); Collections.shuffle(availableLeases);
if (expiredLeases.size() > 0) { if (availableLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases // If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) { for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0)); leasesToTake.add(availableLeases.remove(0));
} }
} else { } else {
// If there are no expired leases and we need a lease, consider stealing. // If there are no expired leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target); List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCountPerWorker, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) { for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}", log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(), workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
@ -481,7 +473,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
} }
} finally { } finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED); scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED); scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
@ -574,36 +566,28 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
} }
/** /**
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding * Count leases by host, excluding all leases considered 'available to be taken' (ie unassigned or expired)
* leases. * using the supplied list. Always includes myself, but otherwise only includes hosts that are actively
* updating leases. There will be no entry for unassigned leases - ie the returned map will only have
* entries for non-null worker ids - so a good active-worker count is given by the size of the map.
* *
* @param expiredLeases list of leases that are currently expired * @param excludingAvailableLeases list of 'available' (unassigned/expired) leases to exclude from the count
* @return map of workerIdentifier to lease count * @return map of workerIdentifier to lease count
*/ */
@VisibleForTesting @VisibleForTesting
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) { Map<String, Integer> computeActiveLeaseCountsByWorker(List<Lease> excludingAvailableLeases) {
Map<String, Integer> leaseCounts = new HashMap<>();
// The set will give much faster lookup than the original list, an // The set will give much faster lookup than the original list, an
// important optimization when the list is large // important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases); Set<Lease> availableLeasesSet = new HashSet<>(excludingAvailableLeases);
// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. Map<String, Integer> activeLeaseCountsByWorker = allLeases.values().stream()
for (Lease lease : allLeases.values()) { .filter(lease -> !lease.isUnassigned() && !availableLeasesSet.contains(lease))
if (!expiredLeasesSet.contains(lease)) { .collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1)));
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
leaseCounts.put(leaseOwner, 1);
} else {
leaseCounts.put(leaseOwner, oldCount + 1);
}
}
}
// If I have no leases, I wasn't represented in leaseCounts. Let's fix that. // If I have no leases, I won't be represented in activeLeaseCountsByWorker. Let's fix that.
leaseCounts.putIfAbsent(workerIdentifier, 0); activeLeaseCountsByWorker.putIfAbsent(workerIdentifier, 0);
return leaseCounts; return activeLeaseCountsByWorker;
} }
/** /**

View file

@ -111,7 +111,7 @@ public class DynamoDBLeaseTakerTest {
} }
@Test @Test
public void test_computeLeaseCounts_noExpiredLease() throws Exception { public void test_computeActiveLeaseCountsByWorker_noAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>() final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease(null, "1")) .add(createLease(null, "1"))
.add(createLease("foo", "2")) .add(createLease("foo", "2"))
@ -125,10 +125,9 @@ public class DynamoDBLeaseTakerTest {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L); when(timeProvider.call()).thenReturn(1000L);
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of()); final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(ImmutableList.of());
final Map<String, Integer> expectedOutput = new HashMap<>(); final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put(null, 1);
expectedOutput.put("foo", 1); expectedOutput.put("foo", 1);
expectedOutput.put("bar", 1); expectedOutput.put("bar", 1);
expectedOutput.put("baz", 1); expectedOutput.put("baz", 1);
@ -136,7 +135,7 @@ public class DynamoDBLeaseTakerTest {
} }
@Test @Test
public void test_computeLeaseCounts_withExpiredLease() throws Exception { public void test_computeActiveLeaseCountsByWorker_withAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>() final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease("foo", "2")) .add(createLease("foo", "2"))
.add(createLease("bar", "3")) .add(createLease("bar", "3"))
@ -149,7 +148,7 @@ public class DynamoDBLeaseTakerTest {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L); when(timeProvider.call()).thenReturn(1000L);
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases); final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(leases);
final Map<String, Integer> expectedOutput = new HashMap<>(); final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put("foo", 0); expectedOutput.put("foo", 0);