* Revert commit for e638c17.
The change caused a regression for leases without owners.
Added unit tests.
This commit is contained in:
parent
7503ec7105
commit
96e3345496
3 changed files with 122 additions and 13 deletions
BIN
.log.swp
Normal file
BIN
.log.swp
Normal file
Binary file not shown.
|
|
@ -14,6 +14,8 @@
|
|||
*/
|
||||
package software.amazon.kinesis.leases.dynamodb;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
|
@ -40,9 +42,6 @@ import software.amazon.kinesis.metrics.MetricsLevel;
|
|||
import software.amazon.kinesis.metrics.MetricsScope;
|
||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static java.util.stream.Collectors.summingInt;
|
||||
|
||||
/**
|
||||
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
|
||||
*/
|
||||
|
|
@ -63,13 +62,14 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
|||
private final long leaseDurationNanos;
|
||||
private final MetricsFactory metricsFactory;
|
||||
|
||||
private final Map<String, Lease> allLeases = new HashMap<>();
|
||||
// TODO: Remove these defaults and use the defaults in the config
|
||||
private int maxLeasesForWorker = Integer.MAX_VALUE;
|
||||
private int maxLeasesToStealAtOneTime = 1;
|
||||
|
||||
private long lastScanTimeNanos = 0L;
|
||||
|
||||
final Map<String, Lease> allLeases = new HashMap<>();
|
||||
|
||||
public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis,
|
||||
final MetricsFactory metricsFactory) {
|
||||
this.leaseRefresher = leaseRefresher;
|
||||
|
|
@ -542,17 +542,32 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
|
|||
* @param expiredLeases list of leases that are currently expired
|
||||
* @return map of workerIdentifier to lease count
|
||||
*/
|
||||
private Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
|
||||
@VisibleForTesting
|
||||
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
|
||||
Map<String, Integer> leaseCounts = new HashMap<>();
|
||||
// The set will give much faster lookup than the original list, an
|
||||
// important optimization when the list is large
|
||||
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
|
||||
|
||||
Map<String, Integer> leaseCounts = allLeases.values().stream()
|
||||
.filter(lease -> !expiredLeasesSet.contains(lease))
|
||||
.collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1)));
|
||||
// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
|
||||
for (Lease lease : allLeases.values()) {
|
||||
if (!expiredLeasesSet.contains(lease)) {
|
||||
String leaseOwner = lease.leaseOwner();
|
||||
Integer oldCount = leaseCounts.get(leaseOwner);
|
||||
if (oldCount == null) {
|
||||
leaseCounts.put(leaseOwner, 1);
|
||||
} else {
|
||||
leaseCounts.put(leaseOwner, oldCount + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If I have no leases, I won't be represented in leaseCounts. Let's fix that.
|
||||
leaseCounts.putIfAbsent(workerIdentifier, 0);
|
||||
// If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
|
||||
Integer myCount = leaseCounts.get(workerIdentifier);
|
||||
if (myCount == null) {
|
||||
myCount = 0;
|
||||
leaseCounts.put(workerIdentifier, myCount);
|
||||
}
|
||||
|
||||
return leaseCounts;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,8 +14,19 @@
|
|||
*/
|
||||
package software.amazon.kinesis.leases.dynamodb;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
|
|
@ -24,13 +35,39 @@ import org.junit.AfterClass;
|
|||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import software.amazon.kinesis.leases.Lease;
|
||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.metrics.NullMetricsScope;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class DynamoDBLeaseTakerTest {
|
||||
|
||||
private static final String WORKER_IDENTIFIER = "foo";
|
||||
private static final long LEASE_DURATION_MILLIS = 1000L;
|
||||
|
||||
private DynamoDBLeaseTaker dynamoDBLeaseTaker;
|
||||
|
||||
@Mock
|
||||
private LeaseRefresher leaseRefresher;
|
||||
@Mock
|
||||
private MetricsFactory metricsFactory;
|
||||
@Mock
|
||||
private Callable<Long> timeProvider;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.dynamoDBLeaseTaker = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
|
|
@ -73,4 +110,61 @@ public class DynamoDBLeaseTakerTest {
|
|||
Assert.assertEquals("foo, bar", DynamoDBLeaseTaker.stringJoin(strings, ", "));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_computeLeaseCounts_noExpiredLease() throws Exception {
|
||||
final List<Lease> leases = new ImmutableList.Builder<Lease>()
|
||||
.add(createLease(null, "1"))
|
||||
.add(createLease("foo", "2"))
|
||||
.add(createLease("bar", "3"))
|
||||
.add(createLease("baz", "4"))
|
||||
.build();
|
||||
dynamoDBLeaseTaker.allLeases.putAll(
|
||||
leases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
|
||||
|
||||
when(leaseRefresher.listLeases()).thenReturn(leases);
|
||||
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
||||
when(timeProvider.call()).thenReturn(1000L);
|
||||
|
||||
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
|
||||
|
||||
final Map<String, Integer> expectedOutput = new HashMap<>();
|
||||
expectedOutput.put(null, 1);
|
||||
expectedOutput.put("foo", 1);
|
||||
expectedOutput.put("bar", 1);
|
||||
expectedOutput.put("baz", 1);
|
||||
assertEquals(expectedOutput, actualOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_computeLeaseCounts_withExpiredLease() throws Exception {
|
||||
final List<Lease> leases = new ImmutableList.Builder<Lease>()
|
||||
.add(createLease("foo", "2"))
|
||||
.add(createLease("bar", "3"))
|
||||
.add(createLease("baz", "4"))
|
||||
.build();
|
||||
dynamoDBLeaseTaker.allLeases.putAll(
|
||||
leases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
|
||||
|
||||
when(leaseRefresher.listLeases()).thenReturn(leases);
|
||||
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
|
||||
when(timeProvider.call()).thenReturn(1000L);
|
||||
|
||||
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
|
||||
|
||||
final Map<String, Integer> expectedOutput = new HashMap<>();
|
||||
expectedOutput.put("foo", 0);
|
||||
assertEquals(expectedOutput, actualOutput);
|
||||
}
|
||||
|
||||
private Lease createLease(String leaseOwner, String leaseKey) {
|
||||
final Lease lease = new Lease();
|
||||
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
|
||||
lease.ownerSwitchesSinceCheckpoint(0L);
|
||||
lease.leaseCounter(0L);
|
||||
lease.leaseOwner(leaseOwner);
|
||||
lease.parentShardIds(Collections.singleton("parentShardId"));
|
||||
lease.childShardIds(new HashSet<>());
|
||||
lease.leaseKey(leaseKey);
|
||||
return lease;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue