diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 06b2527b..9c2a3d03 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -543,16 +543,30 @@ public class DynamoDBLeaseTaker implements LeaseTaker { * @return map of workerIdentifier to lease count */ private Map computeLeaseCounts(List expiredLeases) { + Map leaseCounts = new HashMap<>(); // The set will give much faster lookup than the original list, an // important optimization when the list is large Set expiredLeasesSet = new HashSet<>(expiredLeases); - Map leaseCounts = allLeases.values().stream() - .filter(lease -> !expiredLeasesSet.contains(lease)) - .collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1))); + // Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. + for (Lease lease : allLeases.values()) { + if (!expiredLeasesSet.contains(lease)) { + String leaseOwner = lease.leaseOwner(); + Integer oldCount = leaseCounts.get(leaseOwner); + if (oldCount == null) { + leaseCounts.put(leaseOwner, 1); + } else { + leaseCounts.put(leaseOwner, oldCount + 1); + } + } + } - // If I have no leases, I won't be represented in leaseCounts. Let's fix that. - leaseCounts.putIfAbsent(workerIdentifier, 0); + // If I have no leases, I wasn't represented in leaseCounts. Let's fix that. + Integer myCount = leaseCounts.get(workerIdentifier); + if (myCount == null) { + myCount = 0; + leaseCounts.put(workerIdentifier, myCount); + } return leaseCounts; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 92dc28a2..016ebea6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -14,8 +14,14 @@ */ package software.amazon.kinesis.leases.dynamodb; +import com.google.common.collect.ImmutableMap; + import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import junit.framework.Assert; @@ -24,13 +30,39 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.NullMetricsScope; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -/** - * - */ +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseTakerTest { + private static final String WORKER_IDENTIFIER = "foo"; + private static final long LEASE_DURATION_MILLIS = 1000L; + + private DynamoDBLeaseTaker dynamoDBLeaseTaker; + + @Mock + private LeaseRefresher leaseRefresher; + @Mock + private MetricsFactory metricsFactory; + @Mock + private Callable timeProvider; + + @Before + public void setup() { + this.dynamoDBLeaseTaker = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory); + } + /** * @throws java.lang.Exception */ @@ -73,4 +105,25 @@ public class DynamoDBLeaseTakerTest { Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", ")); } + + @Test + public void test_computeLeaseCounts() throws Exception { + Lease lease = new Lease(); + lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); + lease.ownerSwitchesSinceCheckpoint(0L); + lease.leaseCounter(0L); + lease.leaseOwner(null); + lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); + lease.leaseKey("1"); + final List leases = Collections.singletonList(lease); + + when(leaseRefresher.listLeases()).thenReturn(leases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(1000L); + + Map actualOutput = dynamoDBLeaseTaker.takeLeases(timeProvider); + + assertEquals(ImmutableMap.of(), actualOutput); + } }