Merge f9d50772d5 into 34f19c5a7b
This commit is contained in:
commit
6f4ce2d0bc
3 changed files with 58 additions and 62 deletions
|
|
@ -195,6 +195,15 @@ public class Lease {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param leaseDurationNanos duration of lease in nanoseconds
|
||||||
|
* @param asOfNanos time in nanoseconds to check expiration as-of
|
||||||
|
* @return true if lease is available (unassigned or expired as-of given time), false otherwise
|
||||||
|
*/
|
||||||
|
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
|
||||||
|
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets lastCounterIncrementNanos
|
* Sets lastCounterIncrementNanos
|
||||||
*
|
*
|
||||||
|
|
@ -310,6 +319,10 @@ public class Lease {
|
||||||
this.leaseOwner = leaseOwner;
|
this.leaseOwner = leaseOwner;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isUnassigned() {
|
||||||
|
return leaseOwner == null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
|
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
|
|
@ -44,6 +45,10 @@ import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
|
|
||||||
import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;
|
import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;
|
||||||
|
|
||||||
|
import static java.util.stream.Collectors.groupingBy;
|
||||||
|
import static java.util.stream.Collectors.summingInt;
|
||||||
|
import static java.util.stream.Collectors.toList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
||||||
*/
|
*/
|
||||||
|
|
@ -191,9 +196,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
return takenLeases;
|
return takenLeases;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Lease> expiredLeases = getExpiredLeases();
|
Set<Lease> leasesToTake = computeLeasesToTake(getAvailableLeases());
|
||||||
|
|
||||||
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
|
|
||||||
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
|
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
|
||||||
|
|
||||||
Set<String> untakenLeaseKeys = new HashSet<>();
|
Set<String> untakenLeaseKeys = new HashSet<>();
|
||||||
|
|
@ -358,50 +361,39 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return list of leases that were expired as of our last scan.
|
* @return list of leases that were available (unassigned or expired) as of our last scan.
|
||||||
*/
|
*/
|
||||||
private List<Lease> getExpiredLeases() {
|
private List<Lease> getAvailableLeases() {
|
||||||
List<Lease> expiredLeases = new ArrayList<>();
|
return allLeases.values().stream().filter(
|
||||||
|
lease -> lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)
|
||||||
for (Lease lease : allLeases.values()) {
|
).collect(toList());
|
||||||
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
|
|
||||||
expiredLeases.add(lease);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return expiredLeases;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute the number of leases I should try to take based on the state of the system.
|
* Compute the number of leases I should try to take based on the state of the system.
|
||||||
*
|
*
|
||||||
* @param expiredLeases list of leases we determined to be expired
|
* @param availableLeases list of leases we determined to be available (unassigned or expired) * @return set of leases to take.
|
||||||
* @return set of leases to take.
|
|
||||||
*/
|
*/
|
||||||
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
|
private Set<Lease> computeLeasesToTake(List<Lease> availableLeases) {
|
||||||
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
|
Map<String, Integer> leaseCountPerWorker = computeActiveLeaseCountsByWorker(availableLeases);
|
||||||
Set<Lease> leasesToTake = new HashSet<>();
|
Set<Lease> leasesToTake = new HashSet<>();
|
||||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
|
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
|
||||||
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
|
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
|
||||||
List<Lease> veryOldLeases = new ArrayList<>();
|
List<Lease> veryOldLeases = new ArrayList<>();
|
||||||
|
|
||||||
final int numAvailableLeases = expiredLeases.size();
|
final int numAvailableLeases = availableLeases.size();
|
||||||
int numLeases = 0;
|
final int numLeases = allLeases.size();
|
||||||
int numWorkers = 0;
|
final int numWorkers = leaseCountPerWorker.size();
|
||||||
int numLeasesToReachTarget = 0;
|
int numLeasesToReachTarget = 0;
|
||||||
int leaseSpillover = 0;
|
int leaseSpillover = 0;
|
||||||
int veryOldLeaseCount = 0;
|
int veryOldLeaseCount = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
numLeases = allLeases.size();
|
|
||||||
numWorkers = leaseCounts.size();
|
|
||||||
|
|
||||||
if (numLeases == 0) {
|
if (numLeases == 0) {
|
||||||
// If there are no leases, I shouldn't try to take any.
|
// If there are no leases, I shouldn't try to take any.
|
||||||
return leasesToTake;
|
return leasesToTake;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int target;
|
int target;
|
||||||
if (numWorkers >= numLeases) {
|
if (numWorkers >= numLeases) {
|
||||||
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
|
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
|
||||||
|
|
@ -427,17 +419,17 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int myCount = leaseCounts.get(workerIdentifier);
|
int myCount = leaseCountPerWorker.get(workerIdentifier);
|
||||||
numLeasesToReachTarget = target - myCount;
|
numLeasesToReachTarget = target - myCount;
|
||||||
|
|
||||||
int currentLeaseCount = leaseCounts.get(workerIdentifier);
|
int currentLeaseCount = leaseCountPerWorker.get(workerIdentifier);
|
||||||
// If there are leases that have been expired for an extended period of
|
// If there are leases that have been expired for an extended period of
|
||||||
// time, take them with priority, disregarding the target (computed
|
// time, take them with priority, disregarding the target (computed
|
||||||
// later) but obeying the maximum limit per worker.
|
// later) but obeying the maximum limit per worker.
|
||||||
veryOldLeases = allLeases.values().stream()
|
veryOldLeases = allLeases.values().stream()
|
||||||
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
|
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
|
||||||
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
|
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
|
||||||
.collect(Collectors.toList());
|
.collect(toList());
|
||||||
|
|
||||||
if (!veryOldLeases.isEmpty()) {
|
if (!veryOldLeases.isEmpty()) {
|
||||||
Collections.shuffle(veryOldLeases);
|
Collections.shuffle(veryOldLeases);
|
||||||
|
|
@ -454,17 +446,17 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
return leasesToTake;
|
return leasesToTake;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shuffle expiredLeases so workers don't all try to contend for the same leases.
|
// Shuffle availableLeases so workers don't all try to contend for the same leases.
|
||||||
Collections.shuffle(expiredLeases);
|
Collections.shuffle(availableLeases);
|
||||||
|
|
||||||
if (expiredLeases.size() > 0) {
|
if (availableLeases.size() > 0) {
|
||||||
// If we have expired leases, get up to <needed> leases from expiredLeases
|
// If we have available leases, get up to <needed> leases from availableLeases
|
||||||
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
|
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
|
||||||
leasesToTake.add(expiredLeases.remove(0));
|
leasesToTake.add(availableLeases.remove(0));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If there are no expired leases and we need a lease, consider stealing.
|
// If there are no expired leases and we need a lease, consider stealing.
|
||||||
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
|
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCountPerWorker, numLeasesToReachTarget, target);
|
||||||
for (Lease leaseToSteal : leasesToSteal) {
|
for (Lease leaseToSteal : leasesToSteal) {
|
||||||
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
|
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
|
||||||
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
|
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
|
||||||
|
|
@ -482,7 +474,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
}
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||||
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||||
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
|
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
|
||||||
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
|
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
|
||||||
|
|
@ -575,36 +567,28 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
|
* Count leases by host, excluding all leases considered 'available to be taken' (ie unassigned or expired)
|
||||||
* leases.
|
* using the supplied list. Always includes myself, but otherwise only includes hosts that are actively
|
||||||
|
* updating leases. There will be no entry for unassigned leases - ie the returned map will only have
|
||||||
|
* entries for non-null worker ids - so a good active-worker count is given by the size of the map.
|
||||||
*
|
*
|
||||||
* @param expiredLeases list of leases that are currently expired
|
* @param excludingAvailableLeases list of 'available' (unassigned/expired) leases to exclude from the count
|
||||||
* @return map of workerIdentifier to lease count
|
* @return map of workerIdentifier to lease count
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
|
Map<String, Integer> computeActiveLeaseCountsByWorker(List<Lease> excludingAvailableLeases) {
|
||||||
Map<String, Integer> leaseCounts = new HashMap<>();
|
|
||||||
// The set will give much faster lookup than the original list, an
|
// The set will give much faster lookup than the original list, an
|
||||||
// important optimization when the list is large
|
// important optimization when the list is large
|
||||||
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
|
Set<Lease> availableLeasesSet = new HashSet<>(excludingAvailableLeases);
|
||||||
|
|
||||||
// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
|
Map<String, Integer> activeLeaseCountsByWorker = allLeases.values().stream()
|
||||||
for (Lease lease : allLeases.values()) {
|
.filter(lease -> !lease.isUnassigned() && !availableLeasesSet.contains(lease))
|
||||||
if (!expiredLeasesSet.contains(lease)) {
|
.collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1)));
|
||||||
String leaseOwner = lease.leaseOwner();
|
|
||||||
Integer oldCount = leaseCounts.get(leaseOwner);
|
|
||||||
if (oldCount == null) {
|
|
||||||
leaseCounts.put(leaseOwner, 1);
|
|
||||||
} else {
|
|
||||||
leaseCounts.put(leaseOwner, oldCount + 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
|
// If I have no leases, I won't be represented in activeLeaseCountsByWorker. Let's fix that.
|
||||||
leaseCounts.putIfAbsent(workerIdentifier, 0);
|
activeLeaseCountsByWorker.putIfAbsent(workerIdentifier, 0);
|
||||||
|
|
||||||
return leaseCounts;
|
return activeLeaseCountsByWorker;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,7 @@ public class DynamoDBLeaseTakerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_computeLeaseCounts_noExpiredLease() throws Exception {
|
public void test_computeActiveLeaseCountsByWorker_noAvailableLeases() throws Exception {
|
||||||
final List<Lease> leases = new ImmutableList.Builder<Lease>()
|
final List<Lease> leases = new ImmutableList.Builder<Lease>()
|
||||||
.add(createLease(null, "1"))
|
.add(createLease(null, "1"))
|
||||||
.add(createLease("foo", "2"))
|
.add(createLease("foo", "2"))
|
||||||
|
|
@ -125,10 +125,9 @@ public class DynamoDBLeaseTakerTest {
|
||||||
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
||||||
when(timeProvider.call()).thenReturn(1000L);
|
when(timeProvider.call()).thenReturn(1000L);
|
||||||
|
|
||||||
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
|
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(ImmutableList.of());
|
||||||
|
|
||||||
final Map<String, Integer> expectedOutput = new HashMap<>();
|
final Map<String, Integer> expectedOutput = new HashMap<>();
|
||||||
expectedOutput.put(null, 1);
|
|
||||||
expectedOutput.put("foo", 1);
|
expectedOutput.put("foo", 1);
|
||||||
expectedOutput.put("bar", 1);
|
expectedOutput.put("bar", 1);
|
||||||
expectedOutput.put("baz", 1);
|
expectedOutput.put("baz", 1);
|
||||||
|
|
@ -136,7 +135,7 @@ public class DynamoDBLeaseTakerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_computeLeaseCounts_withExpiredLease() throws Exception {
|
public void test_computeActiveLeaseCountsByWorker_withAvailableLeases() throws Exception {
|
||||||
final List<Lease> leases = new ImmutableList.Builder<Lease>()
|
final List<Lease> leases = new ImmutableList.Builder<Lease>()
|
||||||
.add(createLease("foo", "2"))
|
.add(createLease("foo", "2"))
|
||||||
.add(createLease("bar", "3"))
|
.add(createLease("bar", "3"))
|
||||||
|
|
@ -149,7 +148,7 @@ public class DynamoDBLeaseTakerTest {
|
||||||
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
||||||
when(timeProvider.call()).thenReturn(1000L);
|
when(timeProvider.call()).thenReturn(1000L);
|
||||||
|
|
||||||
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
|
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(leases);
|
||||||
|
|
||||||
final Map<String, Integer> expectedOutput = new HashMap<>();
|
final Map<String, Integer> expectedOutput = new HashMap<>();
|
||||||
expectedOutput.put("foo", 0);
|
expectedOutput.put("foo", 0);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue