From c68ab705bd536b0fbaa1ad5872e59b41f727bbdb Mon Sep 17 00:00:00 2001 From: Renju Radhakrishnan Date: Fri, 17 Jul 2020 14:48:25 -0700 Subject: [PATCH] Fix NonGreedyTake integration test and add extra integration test (#80) * Fix NonGreedyTake integration test and add extra integration test for very old leases * Fix spacing in comment --- .../leases/dynamodb/DynamoDBLeaseTaker.java | 16 ++++++++++-- .../DynamoDBLeaseTakerIntegrationTest.java | 26 +++++++++++++++---- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 3ec2de22..4249f32b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -48,7 +48,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseTaker implements LeaseTaker { private static final int TAKE_RETRIES = 3; private static final int SCAN_RETRIES = 1; - private static final int VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER = 3; + private long veryOldLeaseDurationNanosMultiplier = 3; // See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable. private static final Callable SYSTEM_CLOCK_CALLABLE = System::nanoTime; @@ -96,6 +96,18 @@ public class DynamoDBLeaseTaker implements LeaseTaker { return this; } + /** + * Overrides the default very old lease duration nanos multiplier to increase the threshold for taking very old leases. + * Setting this to a higher value than 3 will increase the threshold for very old lease taking. + * + * @param veryOldLeaseDurationNanosMultipler Very old lease duration multiplier for adjusting very old lease taking. + * @return LeaseTaker + */ + public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) { + this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultipler; + return this; + } + /** * Max leases to steal from a more loaded Worker at one time (for load balancing). * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), @@ -382,7 +394,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // later) but obeying the maximum limit per worker. veryOldLeases = allLeases.values().stream() .filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos() - > VERY_OLD_LEASE_DURATION_NANOS_MULTIPLIER * leaseDurationNanos) + > veryOldLeaseDurationNanosMultiplier * leaseDurationNanos) .collect(Collectors.toList()); if (!veryOldLeases.isEmpty()) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java index 2e3d65a4..972d3951 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java @@ -16,22 +16,17 @@ package software.amazon.kinesis.leases.dynamodb; import java.util.Collection; import java.util.Map; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; - import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseIntegrationTest; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.metrics.NullMetricsFactory; - import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; @RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { @@ -105,9 +100,30 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { builder.withLease("4", "bar").build(); + // setting multiplier to unusually high number to avoid very old lease taking + taker.withVeryOldLeaseDurationNanosMultipler(5000000000L); builder.takeMutateAssert(taker, 2); } + /** + * Verify that we take all very old leases by setting up an environment where there are 4 leases and 2 workers, + * only one of which holds a lease. This leaves 3 free leases. LeaseTaker should take all 3 leases since they + * are denoted as very old. + */ + @Test + public void testVeryOldLeaseTaker() throws LeasingException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + for (int i = 0; i < 3; i++) { + builder.withLease(Integer.toString(i), null); + } + + builder.withLease("4", "bar").build(); + + // setting multiplier to unusually high number to avoid very old lease taking + builder.takeMutateAssert(taker, 3); + } + /** * Verify that when getAllLeases() is called, DynamoDBLeaseTaker * - does not call listLeases()