diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java index 063451a0..d2540073 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/HashKeyRangeForLease.java @@ -32,6 +32,13 @@ public class HashKeyRangeForLease { private final BigInteger startingHashKey; private final BigInteger endingHashKey; + public HashKeyRangeForLease(BigInteger startingHashKey, BigInteger endingHashKey) { + Validate.isTrue(startingHashKey.compareTo(endingHashKey) < 0, + "StartingHashKey %s must be less than EndingHashKey %s ", startingHashKey, endingHashKey); + this.startingHashKey = startingHashKey; + this.endingHashKey = endingHashKey; + } + /** * Serialize the startingHashKey for persisting in external storage * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index a96bf01e..5ac4647c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.coordinator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -245,7 +246,8 @@ class PeriodicShardSyncManager { private final String reasonForDecision; } - private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { + @VisibleForTesting + Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { // Filter the leases with any checkpoint other than shard end. List activeLeases = leases.stream() .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList()); @@ -385,13 +387,16 @@ class PeriodicShardSyncManager { private static final long serialVersionUID = 1L; - @Override public int compare(Lease lease, Lease otherLease) { + @Override + public int compare(Lease lease, Lease otherLease) { Validate.notNull(lease); Validate.notNull(otherLease); Validate.notNull(lease.hashKeyRangeForLease()); Validate.notNull(otherLease.hashKeyRangeForLease()); - return lease.hashKeyRangeForLease().startingHashKey() - .compareTo(otherLease.hashKeyRangeForLease().startingHashKey()); + return ComparisonChain.start() + .compare(lease.hashKeyRangeForLease().startingHashKey(), otherLease.hashKeyRangeForLease().startingHashKey()) + .compare(lease.hashKeyRangeForLease().endingHashKey(), otherLease.hashKeyRangeForLease().endingHashKey()) + .result(); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 91125a06..e2f2f852 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -352,8 +352,6 @@ public class Scheduler implements Runnable { } log.info("Scheduling periodicShardSync"); leaderElectedPeriodicShardSyncManager.start(); - // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged - // TODO: Determine if waitUntilHashRangeCovered() is needed. streamSyncWatch.start(); isDone = true; } catch (LeasingException e) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index 9577e7a8..dfba2791 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.coordinator; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -23,6 +25,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; @@ -33,7 +36,9 @@ import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -106,6 +111,44 @@ public class PeriodicShardSyncManagerTest { .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); } + @Test + public void testForSuccessWhenUnSortedHashRangesAreComplete() { + List hashRanges = new ArrayList() {{ + add(deserialize("4", "23")); + add(deserialize("2", "3")); + add(deserialize("0", "1")); + add(deserialize("24", MAX_HASH_KEY.toString())); + add(deserialize("6", "23")); + + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); + } + + @Test + public void testForSuccessWhenHashRangesAreCompleteForOverlappingLeasesAtEnd() { + List hashRanges = new ArrayList() {{ + add(deserialize("0", "1")); + add(deserialize("2", "3")); + add(deserialize("4", "23")); + add(deserialize("6", "23")); + add(deserialize("24", MAX_HASH_KEY.toString())); + add(deserialize("24", "45")); + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); + Assert.assertFalse(PeriodicShardSyncManager + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); + } + @Test public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() { Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, null).shouldDoShardSync()); @@ -135,7 +178,7 @@ public class PeriodicShardSyncManagerTest { } @Test - public void testIfShardSyncIsNotInitiatedWhenConfidenceFactorIsReached() { + public void testIfShardSyncIsInitiatedWhenConfidenceFactorIsReached() { List multiStreamLeases = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize("2", "3")); @@ -362,4 +405,184 @@ public class PeriodicShardSyncManagerTest { } } + @Test + public void testFor1000DifferentValidSplitHierarchyTreeTheHashRangesAreAlwaysComplete() { + for(int i=0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.SPLIT, maxInitialLeaseCount, false); + Collections.shuffle(leases); +// System.out.println( +// leases.stream().map(l -> l.checkpoint().sequenceNumber() + ":" + l.hashKeyRangeForLease()).collect(Collectors.toList())); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidMergeHierarchyTreeTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, false); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidReshardHierarchyTreeTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, false); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidMergeHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.MERGE, maxInitialLeaseCount, true); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + @Test + public void testFor1000DifferentValidReshardHierarchyTreeWithSomeInProgressParentsTheHashRangesAreAlwaysComplete() { + for (int i = 0; i < 1000; i++) { + int maxInitialLeaseCount = 100; + List leases = generateInitialLeases(maxInitialLeaseCount); + reshard(leases, 5, ReshardType.ANY, maxInitialLeaseCount, true); + Collections.shuffle(leases); + Assert.assertFalse(periodicShardSyncManager.hasHoleInLeases(streamIdentifier, leases).isPresent()); + } + } + + + + private List generateInitialLeases(int initialShardCount) { + long hashRangeInternalMax = 10000000; + List initialLeases = new ArrayList<>(); + long leaseStartKey = 0; + for (int i = 1; i <= initialShardCount; i++) { + final Lease lease = new Lease(); + long leaseEndKey; + if (i != initialShardCount) { + leaseEndKey = (hashRangeInternalMax / initialShardCount) * i; + lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", leaseEndKey + "")); + } else { + leaseEndKey = 0; + lease.hashKeyRange(HashKeyRangeForLease.deserialize(leaseStartKey + "", MAX_HASH_KEY.toString())); + } + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + lease.leaseKey("shard-" + i); + initialLeases.add(lease); + leaseStartKey = leaseEndKey + 1; + } + return initialLeases; + } + + private void reshard(List initialLeases, int depth, ReshardType reshardType, int leaseCounter, + boolean shouldKeepSomeParentsInProgress) { + for (int i = 0; i < depth; i++) { + if (reshardType == ReshardType.SPLIT) { + leaseCounter = split(initialLeases, leaseCounter); + } else if (reshardType == ReshardType.MERGE) { + leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress); + } else { + if (isHeads()) { + leaseCounter = split(initialLeases, leaseCounter); + } else { + leaseCounter = merge(initialLeases, leaseCounter, shouldKeepSomeParentsInProgress); + } + } + } + } + + private int merge(List initialLeases, int leaseCounter, boolean shouldKeepSomeParentsInProgress) { + List leasesEligibleForMerge = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds())) + .collect(Collectors.toList()); +// System.out.println("Leases to merge : " + leasesEligibleForMerge); + int leasesToMerge = (int) ((leasesEligibleForMerge.size() - 1) / 2.0 * Math.random()); + for (int i = 0; i < leasesToMerge; i += 2) { + Lease parent1 = leasesEligibleForMerge.get(i); + Lease parent2 = leasesEligibleForMerge.get(i + 1); + if(parent2.hashKeyRangeForLease().startingHashKey().subtract(parent1.hashKeyRangeForLease().endingHashKey()).equals(BigInteger.ONE)) + { + parent1.checkpoint(ExtendedSequenceNumber.SHARD_END); + if (!shouldKeepSomeParentsInProgress || (shouldKeepSomeParentsInProgress && isOneFromDiceRoll())) { +// System.out.println("Deciding to keep parent in progress : " + parent2); + parent2.checkpoint(ExtendedSequenceNumber.SHARD_END); + } + Lease child = new Lease(); + child.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child.leaseKey("shard-" + (++leaseCounter)); +// System.out.println("Parent " + parent1 + " and " + parent2 + " merges into " + child); + child.hashKeyRange(new HashKeyRangeForLease(parent1.hashKeyRangeForLease().startingHashKey(), + parent2.hashKeyRangeForLease().endingHashKey())); + parent1.childShardIds(Collections.singletonList(child.leaseKey())); + parent2.childShardIds(Collections.singletonList(child.leaseKey())); + child.parentShardIds(Sets.newHashSet(parent1.leaseKey(), parent2.leaseKey())); + + initialLeases.add(child); + } + } + return leaseCounter; + } + + private int split(List initialLeases, int leaseCounter) { + List leasesEligibleForSplit = initialLeases.stream().filter(l -> CollectionUtils.isNullOrEmpty(l.childShardIds())) + .collect(Collectors.toList()); +// System.out.println("Leases to split : " + leasesEligibleForSplit); + int leasesToSplit = (int) (leasesEligibleForSplit.size() * Math.random()); + for (int i = 0; i < leasesToSplit; i++) { + Lease parent = leasesEligibleForSplit.get(i); + parent.checkpoint(ExtendedSequenceNumber.SHARD_END); + Lease child1 = new Lease(); + child1.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child1.hashKeyRange(new HashKeyRangeForLease(parent.hashKeyRangeForLease().startingHashKey(), + parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey()) + .divide(new BigInteger("2")))); + child1.leaseKey("shard-" + (++leaseCounter)); + Lease child2 = new Lease(); + child2.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + child2.hashKeyRange(new HashKeyRangeForLease( + parent.hashKeyRangeForLease().startingHashKey().add(parent.hashKeyRangeForLease().endingHashKey()) + .divide(new BigInteger("2")).add(new BigInteger("1")), + parent.hashKeyRangeForLease().endingHashKey())); + child2.leaseKey("shard-" + (++leaseCounter)); + + child1.parentShardIds(Sets.newHashSet(parent.leaseKey())); + child2.parentShardIds(Sets.newHashSet(parent.leaseKey())); + parent.childShardIds(Lists.newArrayList(child1.leaseKey(), child2.leaseKey())); + +// System.out.println("Parent " + parent + " splits into " + child1 + " and " + child2); + + initialLeases.add(child1); + initialLeases.add(child2); + } + return leaseCounter; + } + + private boolean isHeads() { + return Math.random() <= 0.5; + } + + private boolean isOneFromDiceRoll() { + return Math.random() <= 0.16; + } + + + private enum ReshardType { + SPLIT, + MERGE, + ANY + } + + + }