Add very old leases + update lease taker to always emit metrics (#54)

* Add very old leases + metrics update

* Add conditional emisison for metrics in case of very old leases

* move code blocks around to ensure all metrics are emitted
This commit is contained in:
Renju Radhakrishnan 2020-07-07 16:58:40 -07:00 committed by GitHub
parent 74ffd4060c
commit 347ae9eb68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -25,7 +25,7 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -36,8 +36,8 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.metrics.MetricsUtil;
/** /**
@ -48,6 +48,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
public class DynamoDBLeaseTaker implements LeaseTaker { public class DynamoDBLeaseTaker implements LeaseTaker {
private static final int TAKE_RETRIES = 3; private static final int TAKE_RETRIES = 3;
private static final int SCAN_RETRIES = 1; private static final int SCAN_RETRIES = 1;
private static final int VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER = 3;
// See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable. // See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable.
private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime; private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
@ -329,16 +330,24 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
Set<Lease> leasesToTake = new HashSet<>(); Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();
int numLeases = 0;
int numWorkers = 0;
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;
try { try {
int numLeases = allLeases.size(); numLeases = allLeases.size();
int numWorkers = leaseCounts.size(); numWorkers = leaseCounts.size();
if (numLeases == 0) { if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any. // If there are no leases, I shouldn't try to take any.
return leasesToTake; return leasesToTake;
} }
int target; int target;
if (numWorkers >= numLeases) { if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself. // If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
@ -353,7 +362,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// Spill over is the number of leases this worker should have claimed, but did not because it would // Spill over is the number of leases this worker should have claimed, but did not because it would
// exceed the max allowed for this worker. // exceed the max allowed for this worker.
int leaseSpillover = Math.max(0, target - maxLeasesForWorker); leaseSpillover = Math.max(0, target - maxLeasesForWorker);
if (target > maxLeasesForWorker) { if (target > maxLeasesForWorker) {
log.warn( log.warn(
"Worker {} target is {} leases and maxLeasesForWorker is {}. Resetting target to {}," "Worker {} target is {} leases and maxLeasesForWorker is {}. Resetting target to {},"
@ -362,11 +371,29 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
workerIdentifier, target, maxLeasesForWorker, maxLeasesForWorker, leaseSpillover); workerIdentifier, target, maxLeasesForWorker, maxLeasesForWorker, leaseSpillover);
target = maxLeasesForWorker; target = maxLeasesForWorker;
} }
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
} }
int myCount = leaseCounts.get(workerIdentifier); int myCount = leaseCounts.get(workerIdentifier);
int numLeasesToReachTarget = target - myCount; numLeasesToReachTarget = target - myCount;
int currentLeaseCount = leaseCounts.get(workerIdentifier);
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER * leaseDurationNanos)
.collect(Collectors.toList());
if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
HashSet<Lease> result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
if (veryOldLeaseCount > 0) {
log.info("Taking leases that have been expired for a long time: {}", result);
}
return result;
}
if (numLeasesToReachTarget <= 0) { if (numLeasesToReachTarget <= 0) {
// If we don't need anything, return the empty set. // If we don't need anything, return the empty set.
@ -376,7 +403,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// Shuffle expiredLeases so workers don't all try to contend for the same leases. // Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases); Collections.shuffle(expiredLeases);
int originalExpiredLeasesSize = expiredLeases.size();
if (expiredLeases.size() > 0) { if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases // If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) { for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
@ -397,16 +423,19 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
log.info( log.info(
"Worker {} saw {} total leases, {} available leases, {} " "Worker {} saw {} total leases, {} available leases, {} "
+ "workers. Target is {} leases, I have {} leases, I will take {} leases", + "workers. Target is {} leases, I have {} leases, I will take {} leases",
workerIdentifier, numLeases, originalExpiredLeasesSize, numWorkers, target, myCount, workerIdentifier, numLeases, expiredLeases.size(), numWorkers, target, myCount,
leasesToTake.size()); leasesToTake.size());
} }
scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("NeededLeases", numLeasesToReachTarget, StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
} finally { } finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("VeryOldLeases", veryOldLeaseCount, StandardUnit.COUNT, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope); MetricsUtil.endScope(scope);
} }