Revert commit for e638c17.

The change caused a regression for leases without owners.
Added unit test.
This commit is contained in:
Sachin Sundar P S 2021-10-27 02:57:42 -07:00
parent 7503ec7105
commit 6ce02ae181
2 changed files with 75 additions and 8 deletions

View file

@ -543,16 +543,30 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
* @return map of workerIdentifier to lease count * @return map of workerIdentifier to lease count
*/ */
private Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) { private Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = new HashMap<>();
// The set will give much faster lookup than the original list, an // The set will give much faster lookup than the original list, an
// important optimization when the list is large // important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases); Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
Map<String, Integer> leaseCounts = allLeases.values().stream() // Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
.filter(lease -> !expiredLeasesSet.contains(lease)) for (Lease lease : allLeases.values()) {
.collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1))); if (!expiredLeasesSet.contains(lease)) {
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
leaseCounts.put(leaseOwner, 1);
} else {
leaseCounts.put(leaseOwner, oldCount + 1);
}
}
}
// If I have no leases, I won't be represented in leaseCounts. Let's fix that. // If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
leaseCounts.putIfAbsent(workerIdentifier, 0); Integer myCount = leaseCounts.get(workerIdentifier);
if (myCount == null) {
myCount = 0;
leaseCounts.put(workerIdentifier, myCount);
}
return leaseCounts; return leaseCounts;
} }

View file

@ -14,8 +14,14 @@
*/ */
package software.amazon.kinesis.leases.dynamodb; package software.amazon.kinesis.leases.dynamodb;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import junit.framework.Assert; import junit.framework.Assert;
@ -24,13 +30,39 @@ import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsScope;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** import static org.junit.Assert.assertEquals;
* import static org.mockito.Mockito.when;
*/
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseTakerTest { public class DynamoDBLeaseTakerTest {
private static final String WORKER_IDENTIFIER = "foo";
private static final long LEASE_DURATION_MILLIS = 1000L;
private DynamoDBLeaseTaker dynamoDBLeaseTaker;
@Mock
private LeaseRefresher leaseRefresher;
@Mock
private MetricsFactory metricsFactory;
@Mock
private Callable<Long> timeProvider;
@Before
public void setup() {
this.dynamoDBLeaseTaker = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory);
}
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
*/ */
@ -73,4 +105,25 @@ public class DynamoDBLeaseTakerTest {
Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", ")); Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", "));
} }
@Test
public void test_computeLeaseCounts() throws Exception {
Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
lease.ownerSwitchesSinceCheckpoint(0L);
lease.leaseCounter(0L);
lease.leaseOwner(null);
lease.parentShardIds(Collections.singleton("parentShardId"));
lease.childShardIds(new HashSet<>());
lease.leaseKey("1");
final List<Lease> leases = Collections.singletonList(lease);
when(leaseRefresher.listLeases()).thenReturn(leases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);
Map<String, Lease> actualOutput = dynamoDBLeaseTaker.takeLeases(timeProvider);
assertEquals(ImmutableMap.of(), actualOutput);
}
} }