Fix NonGreedyTake integration test and add extra integration test (#80)
* Fix NonGreedyTake integration test and add extra integration test for very old leases * Fix spacing in comment
This commit is contained in:
parent
afd0f7c08a
commit
c68ab705bd
2 changed files with 35 additions and 7 deletions
|
|
@ -48,7 +48,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
public class DynamoDBLeaseTaker implements LeaseTaker {
|
public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
private static final int TAKE_RETRIES = 3;
|
private static final int TAKE_RETRIES = 3;
|
||||||
private static final int SCAN_RETRIES = 1;
|
private static final int SCAN_RETRIES = 1;
|
||||||
private static final int VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER = 3;
|
private long veryOldLeaseDurationNanosMultiplier = 3;
|
||||||
|
|
||||||
// See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable.
|
// See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable.
|
||||||
private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
|
private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
|
||||||
|
|
@ -96,6 +96,18 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overrides the default very old lease duration nanos multiplier to increase the threshold for taking very old leases.
|
||||||
|
* Setting this to a higher value than 3 will increase the threshold for very old lease taking.
|
||||||
|
*
|
||||||
|
* @param veryOldLeaseDurationNanosMultipler Very old lease duration multiplier for adjusting very old lease taking.
|
||||||
|
* @return LeaseTaker
|
||||||
|
*/
|
||||||
|
public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) {
|
||||||
|
this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultipler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Max leases to steal from a more loaded Worker at one time (for load balancing).
|
* Max leases to steal from a more loaded Worker at one time (for load balancing).
|
||||||
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
||||||
|
|
@ -382,7 +394,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
// later) but obeying the maximum limit per worker.
|
// later) but obeying the maximum limit per worker.
|
||||||
veryOldLeases = allLeases.values().stream()
|
veryOldLeases = allLeases.values().stream()
|
||||||
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
|
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
|
||||||
> VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER * leaseDurationNanos)
|
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
if (!veryOldLeases.isEmpty()) {
|
if (!veryOldLeases.isEmpty()) {
|
||||||
|
|
|
||||||
|
|
@ -16,22 +16,17 @@ package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
import software.amazon.kinesis.leases.Lease;
|
import software.amazon.kinesis.leases.Lease;
|
||||||
import software.amazon.kinesis.leases.LeaseIntegrationTest;
|
import software.amazon.kinesis.leases.LeaseIntegrationTest;
|
||||||
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
import software.amazon.kinesis.leases.exceptions.LeasingException;
|
||||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
|
public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
|
||||||
|
|
@ -105,9 +100,30 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
|
||||||
|
|
||||||
builder.withLease("4", "bar").build();
|
builder.withLease("4", "bar").build();
|
||||||
|
|
||||||
|
// setting multiplier to unusually high number to avoid very old lease taking
|
||||||
|
taker.withVeryOldLeaseDurationNanosMultipler(5000000000L);
|
||||||
builder.takeMutateAssert(taker, 2);
|
builder.takeMutateAssert(taker, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that we take all very old leases by setting up an environment where there are 4 leases and 2 workers,
|
||||||
|
* only one of which holds a lease. This leaves 3 free leases. LeaseTaker should take all 3 leases since they
|
||||||
|
* are denoted as very old.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testVeryOldLeaseTaker() throws LeasingException {
|
||||||
|
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
|
||||||
|
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
builder.withLease(Integer.toString(i), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.withLease("4", "bar").build();
|
||||||
|
|
||||||
|
// setting multiplier to unusually high number to avoid very old lease taking
|
||||||
|
builder.takeMutateAssert(taker, 3);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that when getAllLeases() is called, DynamoDBLeaseTaker
|
* Verify that when getAllLeases() is called, DynamoDBLeaseTaker
|
||||||
* - does not call listLeases()
|
* - does not call listLeases()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue