Get latest counter before attempting a take to ensure take succeeds
This commit is contained in:
parent
b48de6d1bf
commit
a323272a97
6 changed files with 145 additions and 20 deletions
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package software.amazon.kinesis.common;
|
||||||
|
|
||||||
|
|
||||||
|
public class CommonCalculations {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convenience method for calculating renewer intervals in milliseconds.
|
||||||
|
*
|
||||||
|
* @param leaseDurationMillis Duration of a lease
|
||||||
|
* @param epsilonMillis Allow for some variance when calculating lease expirations
|
||||||
|
*/
|
||||||
|
public static long getRenewerTakerIntervalMillis(long leaseDurationMillis, long epsilonMillis) {
|
||||||
|
return leaseDurationMillis / 3 - epsilonMillis;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -14,20 +14,20 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.leases;
|
package software.amazon.kinesis.leases;
|
||||||
|
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import lombok.NonNull;
|
|
||||||
import lombok.ToString;
|
|
||||||
import lombok.experimental.Accessors;
|
|
||||||
import software.amazon.kinesis.common.HashKeyRangeForLease;
|
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.NonNull;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.ToString;
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
import software.amazon.kinesis.common.HashKeyRangeForLease;
|
||||||
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a
|
* This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a
|
||||||
|
|
@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@Getter
|
@Getter
|
||||||
@Accessors(fluent = true)
|
@Accessors(fluent = true)
|
||||||
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState"})
|
@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState", "isMarkedForLeaseSteal"})
|
||||||
@ToString
|
@ToString
|
||||||
public class Lease {
|
public class Lease {
|
||||||
/*
|
/*
|
||||||
|
|
@ -91,6 +91,16 @@ public class Lease {
|
||||||
*/
|
*/
|
||||||
private byte[] pendingCheckpointState;
|
private byte[] pendingCheckpointState;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Denotes whether the lease is marked for stealing. Deliberately excluded from hashCode and equals and
|
||||||
|
* not persisted in DynamoDB.
|
||||||
|
*
|
||||||
|
* @return flag for denoting lease is marked for stealing.
|
||||||
|
*/
|
||||||
|
@Setter
|
||||||
|
private boolean isMarkedForLeaseSteal;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return count of distinct lease holders between checkpoints.
|
* @return count of distinct lease holders between checkpoints.
|
||||||
*/
|
*/
|
||||||
|
|
@ -141,6 +151,7 @@ public class Lease {
|
||||||
}
|
}
|
||||||
this.hashKeyRangeForLease = hashKeyRangeForLease;
|
this.hashKeyRangeForLease = hashKeyRangeForLease;
|
||||||
this.pendingCheckpointState = pendingCheckpointState;
|
this.pendingCheckpointState = pendingCheckpointState;
|
||||||
|
this.isMarkedForLeaseSteal = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -28,9 +30,6 @@ import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
|
|
@ -48,6 +47,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||||
import software.amazon.kinesis.metrics.MetricsScope;
|
import software.amazon.kinesis.metrics.MetricsScope;
|
||||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
|
import static software.amazon.kinesis.common.CommonCalculations.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
|
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
|
||||||
|
|
@ -156,7 +156,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
||||||
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
|
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
|
||||||
this.leaseRenewer = new DynamoDBLeaseRenewer(
|
this.leaseRenewer = new DynamoDBLeaseRenewer(
|
||||||
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
|
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
|
||||||
this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis;
|
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
|
||||||
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
|
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
|
||||||
if (initialLeaseTableReadCapacity <= 0) {
|
if (initialLeaseTableReadCapacity <= 0) {
|
||||||
throw new IllegalArgumentException("readCapacity should be >= 1");
|
throw new IllegalArgumentException("readCapacity should be >= 1");
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||||
import software.amazon.kinesis.metrics.MetricsScope;
|
import software.amazon.kinesis.metrics.MetricsScope;
|
||||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
|
import static software.amazon.kinesis.common.CommonCalculations.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
||||||
|
|
@ -60,20 +61,23 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
private final LeaseRefresher leaseRefresher;
|
private final LeaseRefresher leaseRefresher;
|
||||||
private final String workerIdentifier;
|
private final String workerIdentifier;
|
||||||
private final long leaseDurationNanos;
|
private final long leaseDurationNanos;
|
||||||
|
private final long leaseRenewalIntervalMillis;
|
||||||
private final MetricsFactory metricsFactory;
|
private final MetricsFactory metricsFactory;
|
||||||
|
|
||||||
|
private final double RENEWAL_SLACK_PERCENTAGE = 0.5;
|
||||||
|
|
||||||
|
private final Map<String, Lease> allLeases = new HashMap<>();
|
||||||
// TODO: Remove these defaults and use the defaults in the config
|
// TODO: Remove these defaults and use the defaults in the config
|
||||||
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
||||||
private int maxLeasesToStealAtOneTime = 1;
|
private int maxLeasesToStealAtOneTime = 1;
|
||||||
|
|
||||||
private long lastScanTimeNanos = 0L;
|
private long lastScanTimeNanos = 0L;
|
||||||
|
|
||||||
final Map<String, Lease> allLeases = new HashMap<>();
|
|
||||||
|
|
||||||
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
|
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
|
||||||
final MetricsFactory metricsFactory) {
|
final MetricsFactory metricsFactory) {
|
||||||
this.leaseRefresher = leaseRefresher;
|
this.leaseRefresher = leaseRefresher;
|
||||||
this.workerIdentifier = workerIdentifier;
|
this.workerIdentifier = workerIdentifier;
|
||||||
|
this.leaseRenewalIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, 0);
|
||||||
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
|
this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
|
||||||
this.metricsFactory = metricsFactory;
|
this.metricsFactory = metricsFactory;
|
||||||
}
|
}
|
||||||
|
|
@ -156,6 +160,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
|
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
|
long updateAllLeasesTotalTimeMillis;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
|
||||||
ProvisionedThroughputException lastException = null;
|
ProvisionedThroughputException lastException = null;
|
||||||
|
|
@ -173,19 +178,23 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
updateAllLeasesTotalTimeMillis = System.currentTimeMillis() - startTime;
|
||||||
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
|
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
|
||||||
MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED);
|
MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (lastException != null) {
|
if (lastException != null) {
|
||||||
log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by"
|
log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by"
|
||||||
+ " last retry:", workerIdentifier, lastException);
|
+ " last retry:", workerIdentifier, lastException);
|
||||||
return takenLeases;
|
return takenLeases;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Lease> expiredLeases = getExpiredLeases();
|
List<Lease> expiredLeases = getExpiredLeases();
|
||||||
|
|
||||||
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
|
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
|
||||||
|
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
|
||||||
|
|
||||||
Set<String> untakenLeaseKeys = new HashSet<>();
|
Set<String> untakenLeaseKeys = new HashSet<>();
|
||||||
|
|
||||||
for (Lease lease : leasesToTake) {
|
for (Lease lease : leasesToTake) {
|
||||||
|
|
@ -233,6 +242,32 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
return takenLeases;
|
return takenLeases;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If update all leases takes longer than the lease renewal time,
|
||||||
|
* we fetch the latest lease info for the given leases that are marked for lease steal.
|
||||||
|
* If nothing is found (or any transient network error occurs),
|
||||||
|
* we default to the last known state of the lease
|
||||||
|
*
|
||||||
|
* @param updateAllLeasesEndTime How long it takes for update all leases to complete
|
||||||
|
* @return set of leases to take.
|
||||||
|
*/
|
||||||
|
private Set<Lease> updateStaleLeasesWithLatestState(long updateAllLeasesEndTime,
|
||||||
|
Set<Lease> leasesToTake) {
|
||||||
|
if (updateAllLeasesEndTime > leaseRenewalIntervalMillis * RENEWAL_SLACK_PERCENTAGE) {
|
||||||
|
leasesToTake = leasesToTake.stream().map(lease -> {
|
||||||
|
if (lease.isMarkedForLeaseSteal()) {
|
||||||
|
try {
|
||||||
|
return leaseRefresher.getLease(lease.leaseKey());
|
||||||
|
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||||
|
log.debug("Unable to retrieve the current lease, defaulting to existing lease", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lease;
|
||||||
|
}).collect(Collectors.toSet());
|
||||||
|
}
|
||||||
|
return leasesToTake;
|
||||||
|
}
|
||||||
|
|
||||||
/** Package access for testing purposes.
|
/** Package access for testing purposes.
|
||||||
*
|
*
|
||||||
* @param strings
|
* @param strings
|
||||||
|
|
@ -530,8 +565,9 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
// Return random ones
|
// Return random ones
|
||||||
Collections.shuffle(candidates);
|
Collections.shuffle(candidates);
|
||||||
int toIndex = Math.min(candidates.size(), numLeasesToSteal);
|
int toIndex = Math.min(candidates.size(), numLeasesToSteal);
|
||||||
leasesToSteal.addAll(candidates.subList(0, toIndex));
|
leasesToSteal.addAll(candidates.subList(0, toIndex).stream()
|
||||||
|
.map(lease -> lease.isMarkedForLeaseSteal(true))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
return leasesToSteal;
|
return leasesToSteal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,9 @@
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -26,6 +28,7 @@ import software.amazon.kinesis.leases.LeaseIntegrationTest;
|
||||||
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
||||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
|
@ -150,6 +153,33 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
|
||||||
assertThat(addedLeases.values().containsAll(allLeases), equalTo(true));
|
assertThat(addedLeases.values().containsAll(allLeases), equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the leaseDurationMillis to 0, ensuring a get request to update the existing lease after computing
|
||||||
|
* leases to take
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSlowGetAllLeases() throws LeasingException {
|
||||||
|
long leaseDurationMillis = 0;
|
||||||
|
taker = new DynamoDBLeaseTaker(leaseRefresher,
|
||||||
|
"foo",
|
||||||
|
leaseDurationMillis,
|
||||||
|
new NullMetricsFactory());
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
Map<String, Lease> addedLeases = builder.withLease("1", "bar")
|
||||||
|
.withLease("2", "bar")
|
||||||
|
.withLease("5", "foo")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertThat(taker.allLeases().size(), equalTo(0));
|
||||||
|
taker.takeLeases();
|
||||||
|
|
||||||
|
Collection<Lease> allLeases = taker.allLeases();
|
||||||
|
assertThat(allLeases.size(), equalTo(addedLeases.size()));
|
||||||
|
assertEquals(addedLeases.values().size(), allLeases.size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that LeaseTaker does not steal when it's only short 1 lease and the other worker is at target. Set up a
|
* Verify that LeaseTaker does not steal when it's only short 1 lease and the other worker is at target. Set up a
|
||||||
* scenario where there are 4 leases held by two servers, and a third server with one lease. The third server should
|
* scenario where there are 4 leases held by two servers, and a third server with one lease. The third server should
|
||||||
|
|
@ -189,7 +219,7 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
|
||||||
builder.build();
|
builder.build();
|
||||||
|
|
||||||
// Assert that one lease was stolen from baz.
|
// Assert that one lease was stolen from baz.
|
||||||
Map<String, Lease> takenLeases = builder.takeMutateAssert(taker, 1);
|
Map<String, Lease> takenLeases = builder.stealMutateAssert(taker, 1);
|
||||||
|
|
||||||
// Assert that it was one of baz's leases (shardId != 1)
|
// Assert that it was one of baz's leases (shardId != 1)
|
||||||
String shardIdStolen = takenLeases.keySet().iterator().next();
|
String shardIdStolen = takenLeases.keySet().iterator().next();
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,23 @@ public class TestHarnessBuilder {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, Lease> stealMutateAssert(DynamoDBLeaseTaker taker, int numToTake)
|
||||||
|
throws LeasingException {
|
||||||
|
Map<String, Lease> result = taker.takeLeases(timeProvider);
|
||||||
|
assertEquals(numToTake, result.size());
|
||||||
|
|
||||||
|
for (Lease actual : result.values()) {
|
||||||
|
Lease original = leases.get(actual.leaseKey());
|
||||||
|
assertNotNull(original);
|
||||||
|
|
||||||
|
original.isMarkedForLeaseSteal(true)
|
||||||
|
.lastCounterIncrementNanos(actual.lastCounterIncrementNanos());
|
||||||
|
mutateAssert(taker.getWorkerIdentifier(), original, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
public Map<String, Lease> takeMutateAssert(DynamoDBLeaseTaker taker, String... takenShardIds)
|
public Map<String, Lease> takeMutateAssert(DynamoDBLeaseTaker taker, String... takenShardIds)
|
||||||
throws LeasingException {
|
throws LeasingException {
|
||||||
Map<String, Lease> result = taker.takeLeases(timeProvider);
|
Map<String, Lease> result = taker.takeLeases(timeProvider);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue