diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 9c2a3d03..e7087e62 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -14,6 +14,8 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.annotations.VisibleForTesting; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,9 +42,6 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.summingInt; - /** * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. */ @@ -63,13 +62,14 @@ public class DynamoDBLeaseTaker implements LeaseTaker { private final long leaseDurationNanos; private final MetricsFactory metricsFactory; - private final Map allLeases = new HashMap<>(); // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; private long lastScanTimeNanos = 0L; + final Map allLeases = new HashMap<>(); + public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; @@ -542,7 +542,8 @@ public class DynamoDBLeaseTaker implements LeaseTaker { * @param expiredLeases list of leases that are currently expired * @return map of workerIdentifier to lease count */ - private Map computeLeaseCounts(List expiredLeases) { + @VisibleForTesting + Map computeLeaseCounts(List expiredLeases) { Map leaseCounts = new HashMap<>(); // The set will give much faster lookup than the original list, an // important optimization when the list is large diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 016ebea6..0dd9e489 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -14,10 +14,12 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -105,10 +107,9 @@ public class DynamoDBLeaseTakerTest { Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", ")); } - @Test - public void test_computeLeaseCounts() throws Exception { - Lease lease = new Lease(); + public void test_computeLeaseCounts_noExpiredLease() throws Exception { + final Lease lease = new Lease(); lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); lease.ownerSwitchesSinceCheckpoint(0L); lease.leaseCounter(0L); @@ -117,13 +118,39 @@ public class DynamoDBLeaseTakerTest { lease.childShardIds(new HashSet<>()); lease.leaseKey("1"); final List leases = Collections.singletonList(lease); + dynamoDBLeaseTaker.allLeases.put("1", lease); when(leaseRefresher.listLeases()).thenReturn(leases); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(timeProvider.call()).thenReturn(1000L); - Map actualOutput = dynamoDBLeaseTaker.takeLeases(timeProvider); + final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of()); - assertEquals(ImmutableMap.of(), actualOutput); + final Map expectedOutput = new HashMap<>(); + expectedOutput.put(null, 1); + expectedOutput.put("foo", 0); + assertEquals(expectedOutput, actualOutput); + } + + @Test + public void test_computeLeaseCounts_withExpiredLease() throws Exception { + final Lease lease = new Lease(); + lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); + lease.ownerSwitchesSinceCheckpoint(0L); + lease.leaseCounter(0L); + lease.leaseOwner(null); + lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); + lease.leaseKey("1"); + final List leases = Collections.singletonList(lease); + dynamoDBLeaseTaker.allLeases.put("1", lease); + + when(leaseRefresher.listLeases()).thenReturn(leases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(1000L); + + final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases); + + assertEquals(ImmutableMap.of("foo", 0), actualOutput); } }