From 347ae9eb68a66542d6d1e25507eebacf6de9bcca Mon Sep 17 00:00:00 2001 From: Renju Radhakrishnan Date: Tue, 7 Jul 2020 16:58:40 -0700 Subject: [PATCH] Add very old leases + update lease taker to always emit metrics (#54) * Add very old leases + metrics update * Add conditional emisison for metrics in case of very old leases * move code blocks around to ensure all metrics are emitted --- .../leases/dynamodb/DynamoDBLeaseTaker.java | 67 +++++++++++++------ 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 41147947..3ec2de22 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -25,7 +25,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -36,8 +36,8 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; /** @@ -48,6 +48,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseTaker implements LeaseTaker { private static final int TAKE_RETRIES = 3; private static final int SCAN_RETRIES = 1; + private static final int VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER = 3; // See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable. private static final Callable SYSTEM_CLOCK_CALLABLE = System::nanoTime; @@ -329,31 +330,39 @@ public class DynamoDBLeaseTaker implements LeaseTaker { Set leasesToTake = new HashSet<>(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); + List veryOldLeases = new ArrayList<>(); + + int numLeases = 0; + int numWorkers = 0; + int numLeasesToReachTarget = 0; + int leaseSpillover = 0; + int veryOldLeaseCount = 0; try { - int numLeases = allLeases.size(); - int numWorkers = leaseCounts.size(); + numLeases = allLeases.size(); + numWorkers = leaseCounts.size(); if (numLeases == 0) { // If there are no leases, I shouldn't try to take any. return leasesToTake; } + int target; if (numWorkers >= numLeases) { // If we have n leases and n or more workers, each worker can have up to 1 lease, including myself. target = 1; } else { - /* - * numWorkers must be < numLeases. - * - * Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases) - */ + /* + * numWorkers must be < numLeases. + * + * Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases) + */ target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1); // Spill over is the number of leases this worker should have claimed, but did not because it would // exceed the max allowed for this worker. - int leaseSpillover = Math.max(0, target - maxLeasesForWorker); + leaseSpillover = Math.max(0, target - maxLeasesForWorker); if (target > maxLeasesForWorker) { log.warn( "Worker {} target is {} leases and maxLeasesForWorker is {}. Resetting target to {}," @@ -362,11 +371,29 @@ public class DynamoDBLeaseTaker implements LeaseTaker { workerIdentifier, target, maxLeasesForWorker, maxLeasesForWorker, leaseSpillover); target = maxLeasesForWorker; } - scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); } int myCount = leaseCounts.get(workerIdentifier); - int numLeasesToReachTarget = target - myCount; + numLeasesToReachTarget = target - myCount; + + int currentLeaseCount = leaseCounts.get(workerIdentifier); + // If there are leases that have been expired for an extended period of + // time, take them with priority, disregarding the target (computed + // later) but obeying the maximum limit per worker. + veryOldLeases = allLeases.values().stream() + .filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos() + > VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER * leaseDurationNanos) + .collect(Collectors.toList()); + + if (!veryOldLeases.isEmpty()) { + Collections.shuffle(veryOldLeases); + veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size())); + HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount)); + if (veryOldLeaseCount > 0) { + log.info("Taking leases that have been expired for a long time: {}", result); + } + return result; + } if (numLeasesToReachTarget <= 0) { // If we don't need anything, return the empty set. @@ -376,7 +403,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // Shuffle expiredLeases so workers don't all try to contend for the same leases. Collections.shuffle(expiredLeases); - int originalExpiredLeasesSize = expiredLeases.size(); if (expiredLeases.size() > 0) { // If we have expired leases, get up to leases from expiredLeases for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) { @@ -397,16 +423,19 @@ public class DynamoDBLeaseTaker implements LeaseTaker { log.info( "Worker {} saw {} total leases, {} available leases, {} " + "workers. Target is {} leases, I have {} leases, I will take {} leases", - workerIdentifier, numLeases, originalExpiredLeasesSize, numWorkers, target, myCount, + workerIdentifier, numLeases, expiredLeases.size(), numWorkers, target, myCount, leasesToTake.size()); } - scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED); - scope.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.COUNT, MetricsLevel.SUMMARY); - scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY); - scope.addData("NeededLeases", numLeasesToReachTarget, StandardUnit.COUNT, MetricsLevel.DETAILED); - scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED); } finally { + scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED); + scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED); + scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED); + scope.addData("VeryOldLeases", veryOldLeaseCount, StandardUnit.COUNT, MetricsLevel.SUMMARY); + MetricsUtil.endScope(scope); }