Added tests for expired leases.
This commit is contained in:
parent
6ce02ae181
commit
e66493da28
2 changed files with 38 additions and 10 deletions
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
@ -40,9 +42,6 @@ import software.amazon.kinesis.metrics.MetricsLevel;
|
||||||
import software.amazon.kinesis.metrics.MetricsScope;
|
import software.amazon.kinesis.metrics.MetricsScope;
|
||||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
|
|
||||||
import static java.util.stream.Collectors.groupingBy;
|
|
||||||
import static java.util.stream.Collectors.summingInt;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
||||||
*/
|
*/
|
||||||
|
|
@ -63,13 +62,14 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
private final long leaseDurationNanos;
|
private final long leaseDurationNanos;
|
||||||
private final MetricsFactory metricsFactory;
|
private final MetricsFactory metricsFactory;
|
||||||
|
|
||||||
private final Map<String, Lease> allLeases = new HashMap<>();
|
|
||||||
// TODO: Remove these defaults and use the defaults in the config
|
// TODO: Remove these defaults and use the defaults in the config
|
||||||
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
||||||
private int maxLeasesToStealAtOneTime = 1;
|
private int maxLeasesToStealAtOneTime = 1;
|
||||||
|
|
||||||
private long lastScanTimeNanos = 0L;
|
private long lastScanTimeNanos = 0L;
|
||||||
|
|
||||||
|
final Map<String, Lease> allLeases = new HashMap<>();
|
||||||
|
|
||||||
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
|
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
|
||||||
final MetricsFactory metricsFactory) {
|
final MetricsFactory metricsFactory) {
|
||||||
this.leaseRefresher = leaseRefresher;
|
this.leaseRefresher = leaseRefresher;
|
||||||
|
|
@ -542,7 +542,8 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
||||||
* @param expiredLeases list of leases that are currently expired
|
* @param expiredLeases list of leases that are currently expired
|
||||||
* @return map of workerIdentifier to lease count
|
* @return map of workerIdentifier to lease count
|
||||||
*/
|
*/
|
||||||
private Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
|
@VisibleForTesting
|
||||||
|
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
|
||||||
Map<String, Integer> leaseCounts = new HashMap<>();
|
Map<String, Integer> leaseCounts = new HashMap<>();
|
||||||
// The set will give much faster lookup than the original list, an
|
// The set will give much faster lookup than the original list, an
|
||||||
// important optimization when the list is large
|
// important optimization when the list is large
|
||||||
|
|
|
||||||
|
|
@ -14,10 +14,12 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.leases.dynamodb;
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
@ -105,10 +107,9 @@ public class DynamoDBLeaseTakerTest {
|
||||||
Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", "));
|
Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", "));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_computeLeaseCounts() throws Exception {
|
public void test_computeLeaseCounts_noExpiredLease() throws Exception {
|
||||||
Lease lease = new Lease();
|
final Lease lease = new Lease();
|
||||||
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
|
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
|
||||||
lease.ownerSwitchesSinceCheckpoint(0L);
|
lease.ownerSwitchesSinceCheckpoint(0L);
|
||||||
lease.leaseCounter(0L);
|
lease.leaseCounter(0L);
|
||||||
|
|
@ -117,13 +118,39 @@ public class DynamoDBLeaseTakerTest {
|
||||||
lease.childShardIds(new HashSet<>());
|
lease.childShardIds(new HashSet<>());
|
||||||
lease.leaseKey("1");
|
lease.leaseKey("1");
|
||||||
final List<Lease> leases = Collections.singletonList(lease);
|
final List<Lease> leases = Collections.singletonList(lease);
|
||||||
|
dynamoDBLeaseTaker.allLeases.put("1", lease);
|
||||||
|
|
||||||
when(leaseRefresher.listLeases()).thenReturn(leases);
|
when(leaseRefresher.listLeases()).thenReturn(leases);
|
||||||
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
||||||
when(timeProvider.call()).thenReturn(1000L);
|
when(timeProvider.call()).thenReturn(1000L);
|
||||||
|
|
||||||
Map<String, Lease> actualOutput = dynamoDBLeaseTaker.takeLeases(timeProvider);
|
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
|
||||||
|
|
||||||
assertEquals(ImmutableMap.of(), actualOutput);
|
final Map<String, Integer> expectedOutput = new HashMap<>();
|
||||||
|
expectedOutput.put(null, 1);
|
||||||
|
expectedOutput.put("foo", 0);
|
||||||
|
assertEquals(expectedOutput, actualOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_computeLeaseCounts_withExpiredLease() throws Exception {
|
||||||
|
final Lease lease = new Lease();
|
||||||
|
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
|
||||||
|
lease.ownerSwitchesSinceCheckpoint(0L);
|
||||||
|
lease.leaseCounter(0L);
|
||||||
|
lease.leaseOwner(null);
|
||||||
|
lease.parentShardIds(Collections.singleton("parentShardId"));
|
||||||
|
lease.childShardIds(new HashSet<>());
|
||||||
|
lease.leaseKey("1");
|
||||||
|
final List<Lease> leases = Collections.singletonList(lease);
|
||||||
|
dynamoDBLeaseTaker.allLeases.put("1", lease);
|
||||||
|
|
||||||
|
when(leaseRefresher.listLeases()).thenReturn(leases);
|
||||||
|
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
||||||
|
when(timeProvider.call()).thenReturn(1000L);
|
||||||
|
|
||||||
|
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
|
||||||
|
|
||||||
|
assertEquals(ImmutableMap.of("foo", 0), actualOutput);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue