diff --git a/.log.swp b/.log.swp new file mode 100644 index 00000000..bdb60bb3 Binary files /dev/null and b/.log.swp differ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 06b2527b..e7087e62 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -14,6 +14,8 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.annotations.VisibleForTesting; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,9 +42,6 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.summingInt; - /** * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. */ @@ -63,13 +62,14 @@ public class DynamoDBLeaseTaker implements LeaseTaker { private final long leaseDurationNanos; private final MetricsFactory metricsFactory; - private final Map allLeases = new HashMap<>(); // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; private long lastScanTimeNanos = 0L; + final Map allLeases = new HashMap<>(); + public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; @@ -542,17 +542,32 @@ public class DynamoDBLeaseTaker implements LeaseTaker { * @param expiredLeases list of leases that are currently expired * @return map of workerIdentifier to lease count */ - private Map computeLeaseCounts(List expiredLeases) { + @VisibleForTesting + Map computeLeaseCounts(List expiredLeases) { + Map leaseCounts = new HashMap<>(); // The set will give much faster lookup than the original list, an // important optimization when the list is large Set expiredLeasesSet = new HashSet<>(expiredLeases); - Map leaseCounts = allLeases.values().stream() - .filter(lease -> !expiredLeasesSet.contains(lease)) - .collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1))); + // Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. + for (Lease lease : allLeases.values()) { + if (!expiredLeasesSet.contains(lease)) { + String leaseOwner = lease.leaseOwner(); + Integer oldCount = leaseCounts.get(leaseOwner); + if (oldCount == null) { + leaseCounts.put(leaseOwner, 1); + } else { + leaseCounts.put(leaseOwner, oldCount + 1); + } + } + } - // If I have no leases, I won't be represented in leaseCounts. Let's fix that. - leaseCounts.putIfAbsent(workerIdentifier, 0); + // If I have no leases, I wasn't represented in leaseCounts. Let's fix that. + Integer myCount = leaseCounts.get(workerIdentifier); + if (myCount == null) { + myCount = 0; + leaseCounts.put(workerIdentifier, myCount); + } return leaseCounts; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 92dc28a2..193970f6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -14,8 +14,19 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.function.Function; +import java.util.stream.Collectors; import junit.framework.Assert; @@ -24,13 +35,39 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.NullMetricsScope; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -/** - * - */ +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseTakerTest { + private static final String WORKER_IDENTIFIER = "foo"; + private static final long LEASE_DURATION_MILLIS = 1000L; + + private DynamoDBLeaseTaker dynamoDBLeaseTaker; + + @Mock + private LeaseRefresher leaseRefresher; + @Mock + private MetricsFactory metricsFactory; + @Mock + private Callable timeProvider; + + @Before + public void setup() { + this.dynamoDBLeaseTaker = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory); + } + /** * @throws java.lang.Exception */ @@ -73,4 +110,61 @@ public class DynamoDBLeaseTakerTest { Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", ")); } + @Test + public void test_computeLeaseCounts_noExpiredLease() throws Exception { + final List leases = new ImmutableList.Builder() + .add(createLease(null, "1")) + .add(createLease("foo", "2")) + .add(createLease("bar", "3")) + .add(createLease("baz", "4")) + .build(); + dynamoDBLeaseTaker.allLeases.putAll( + leases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); + + when(leaseRefresher.listLeases()).thenReturn(leases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(1000L); + + final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of()); + + final Map expectedOutput = new HashMap<>(); + expectedOutput.put(null, 1); + expectedOutput.put("foo", 1); + expectedOutput.put("bar", 1); + expectedOutput.put("baz", 1); + assertEquals(expectedOutput, actualOutput); + } + + @Test + public void test_computeLeaseCounts_withExpiredLease() throws Exception { + final List leases = new ImmutableList.Builder() + .add(createLease("foo", "2")) + .add(createLease("bar", "3")) + .add(createLease("baz", "4")) + .build(); + dynamoDBLeaseTaker.allLeases.putAll( + leases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); + + when(leaseRefresher.listLeases()).thenReturn(leases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(1000L); + + final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases); + + final Map expectedOutput = new HashMap<>(); + expectedOutput.put("foo", 0); + assertEquals(expectedOutput, actualOutput); + } + + private Lease createLease(String leaseOwner, String leaseKey) { + final Lease lease = new Lease(); + lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); + lease.ownerSwitchesSinceCheckpoint(0L); + lease.leaseCounter(0L); + lease.leaseOwner(leaseOwner); + lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); + lease.leaseKey(leaseKey); + return lease; + } }