Adding more unit test cases and fixing an edge case

This commit is contained in:
Ashwin Giridharan 2020-05-25 13:25:43 -07:00
parent fc4781e347
commit 72a6d5e084
4 changed files with 240 additions and 7 deletions

View file

@ -32,6 +32,13 @@ public class HashKeyRangeForLease {
private final BigInteger startingHashKey;
private final BigInteger endingHashKey;
public HashKeyRangeForLease(BigInteger startingHashKey, BigInteger endingHashKey) {
Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0,
"StartingHashKey %s must be less than EndingHashKey %s ", startingHashKey, endingHashKey);
this.startingHashKey = startingHashKey;
this.endingHashKey = endingHashKey;
}
/**
* Serialize the startingHashKey for persisting in external storage
*

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
@ -245,7 +246,8 @@ class PeriodicShardSyncManager {
private final String reasonForDecision;
}
private Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) {
@VisibleForTesting
Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) {
// Filter the leases with any checkpoint other than shard end.
List<Lease> activeLeases = leases.stream()
.filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList());
@ -385,13 +387,16 @@ class PeriodicShardSyncManager {
private static final long serialVersionUID = 1L;
@Override public int compare(Lease lease, Lease otherLease) {
@Override
public int compare(Lease lease, Lease otherLease) {
Validate.notNull(lease);
Validate.notNull(otherLease);
Validate.notNull(lease.hashKeyRangeForLease());
Validate.notNull(otherLease.hashKeyRangeForLease());
return lease.hashKeyRangeForLease().startingHashKey()
.compareTo(otherLease.hashKeyRangeForLease().startingHashKey());
return ComparisonChain.start()
.compare(lease.hashKeyRangeForLease().startingHashKey(), otherLease.hashKeyRangeForLease().startingHashKey())
.compare(lease.hashKeyRangeForLease().endingHashKey(), otherLease.hashKeyRangeForLease().endingHashKey())
.result();
}
}
}

View file

@ -352,8 +352,6 @@ public class Scheduler implements Runnable {
}
log.info("Scheduling periodicShardSync");
leaderElectedPeriodicShardSyncManager.start();
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
// TODO: Determine if waitUntilHashRangeCovered() is needed.
streamSyncWatch.start();
isDone = true;
} catch (LeasingException e) {

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.coordinator;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -23,6 +25,7 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
@ -33,7 +36,9 @@ import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -106,6 +111,44 @@ public class PeriodicShardSyncManagerTest {
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
}
@Test
public void testForSuccessWhenUnSortedHashRangesAreComplete() {
List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("4", "23"));
add(deserialize("2", "3"));
add(deserialize("0", "1"));
add(deserialize("24", MAX_HASH_KEY.toString()));
add(deserialize("6", "23"));
}}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
}
@Test
public void testForSuccessWhenHashRangesAreCompleteForOverlappingLeasesAtEnd() {
List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", MAX_HASH_KEY.toString()));
add(deserialize("24", "45"));
}}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
}
@Test
public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() {
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, null).shouldDoShardSync());
@ -135,7 +178,7 @@ public class PeriodicShardSyncManagerTest {
}
@Test
public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsReached() {
public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsReached() {
List<Lease> multiStreamLeases = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3"));
@ -362,4 +405,184 @@ public class PeriodicShardSyncManagerTest {
}
}
@Test
public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() {
for(int i=0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false);
Collections.shuffle(leases);
// System.out.println(
// leases.stream().map(l -> l.checkpoint().sequenceNumber() + ":" + l.hashKeyRangeForLease()).collect(Collectors.toList()));
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidMergeHierarchyTreeTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, false);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidReshardHierarchyTreeTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, false);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidMergeHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, true);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
@Test
public void testFor1000DifferentValidReshardHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() {
for (int i = 0; i < 1000; i++) {
int maxInitialLeaseCount = 100;
List<Lease> leases = generateInitialLeases(maxInitialLeaseCount);
reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, true);
Collections.shuffle(leases);
Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent());
}
}
private List<Lease> generateInitialLeases(int initialShardCount) {
long hashRangeInternalMax = 10000000;
List<Lease> initialLeases = new ArrayList<>();
long leaseStartKey = 0;
for (int i = 1; i <= initialShardCount; i++) {
final Lease lease = new Lease();
long leaseEndKey;
if (i != initialShardCount) {
leaseEndKey = (hashRangeInternalMax / initialShardCount) * i;
lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", leaseEndKey + ""));
} else {
leaseEndKey = 0;
lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", MAX_HASH_KEY.toString()));
}
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
lease.leaseKey("shard-" + i);
initialLeases.add(lease);
leaseStartKey = leaseEndKey + 1;
}
return initialLeases;
}
private void reshard(List<Lease> initialLeases, int depth, ReshardType reshardType, int leaseCounter,
boolean shouldKeepSomeParentsInProgress) {
for (int i = 0; i < depth; i++) {
if (reshardType == ReshardType.SPLIT) {
leaseCounter = split(initialLeases, leaseCounter);
} else if (reshardType == ReshardType.MERGE) {
leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress);
} else {
if (isHeads()) {
leaseCounter = split(initialLeases, leaseCounter);
} else {
leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress);
}
}
}
}
private int merge(List<Lease> initialLeases, int leaseCounter, boolean shouldKeepSomeParentsInProgress) {
List<Lease> leasesEligibleForMerge = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds()))
.collect(Collectors.toList());
// System.out.println("Leases to merge : " + leasesEligibleForMerge);
int leasesToMerge = (int) ((leasesEligibleForMerge.size() - 1) / 2.0 * Math.random());
for (int i = 0; i < leasesToMerge; i += 2) {
Lease parent1 = leasesEligibleForMerge.get(i);
Lease parent2 = leasesEligibleForMerge.get(i + 1);
if(parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE))
{
parent1.checkpoint(ExtendedSequenceNumber.SHARD_END);
if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) {
// System.out.println("Deciding to keep parent in progress : " + parent2);
parent2.checkpoint(ExtendedSequenceNumber.SHARD_END);
}
Lease child = new Lease();
child.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child.leaseKey("shard-" + (++leaseCounter));
// System.out.println("Parent " + parent1 + " and " + parent2 + " merges into " + child);
child.hashKeyRange(new HashKeyRangeForLease(parent1.hashKeyRangeForLease().startingHashKey(),
parent2.hashKeyRangeForLease().endingHashKey()));
parent1.childShardIds(Collections.singletonList(child.leaseKey()));
parent2.childShardIds(Collections.singletonList(child.leaseKey()));
child.parentShardIds(Sets.newHashSet(parent1.leaseKey(), parent2.leaseKey()));
initialLeases.add(child);
}
}
return leaseCounter;
}
private int split(List<Lease> initialLeases, int leaseCounter) {
List<Lease> leasesEligibleForSplit = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds()))
.collect(Collectors.toList());
// System.out.println("Leases to split : " + leasesEligibleForSplit);
int leasesToSplit = (int) (leasesEligibleForSplit.size() * Math.random());
for (int i = 0; i < leasesToSplit; i++) {
Lease parent = leasesEligibleForSplit.get(i);
parent.checkpoint(ExtendedSequenceNumber.SHARD_END);
Lease child1 = new Lease();
child1.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child1.hashKeyRange(new HashKeyRangeForLease(parent.hashKeyRangeForLease().startingHashKey(),
parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey())
.divide(new BigInteger("2"))));
child1.leaseKey("shard-" + (++leaseCounter));
Lease child2 = new Lease();
child2.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
child2.hashKeyRange(new HashKeyRangeForLease(
parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey())
.divide(new BigInteger("2")).add(new BigInteger("1")),
parent.hashKeyRangeForLease().endingHashKey()));
child2.leaseKey("shard-" + (++leaseCounter));
child1.parentShardIds(Sets.newHashSet(parent.leaseKey()));
child2.parentShardIds(Sets.newHashSet(parent.leaseKey()));
parent.childShardIds(Lists.newArrayList(child1.leaseKey(), child2.leaseKey()));
// System.out.println("Parent " + parent + " splits into " + child1 + " and " + child2);
initialLeases.add(child1);
initialLeases.add(child2);
}
return leaseCounter;
}
private boolean isHeads() {
return Math.random() <= 0.5;
}
private boolean isOneFromDiceRoll() {
return Math.random() <= 0.16;
}
private enum ReshardType {
SPLIT,
MERGE,
ANY
}
}