diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 3979236b..a96bf01e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -19,6 +19,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; import lombok.Value; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import software.amazon.awssdk.services.kinesis.model.Shard; @@ -161,25 +162,28 @@ class PeriodicShardSyncManager { // For each of the stream, check if shard sync needs to be done based on the leases state. for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - if (shouldDoShardSync(streamConfigEntry.getKey(), - streamToLeasesMap.get(streamConfigEntry.getKey()))) { - log.info("Periodic shard syncer initiating shard sync for {}", streamConfigEntry.getKey()); + final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), + streamToLeasesMap.get(streamConfigEntry.getKey())); + if (shardSyncResponse.shouldDoShardSync()) { + log.info("Periodic shard syncer initiating shard sync for {} due to the reason - ", + streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider .apply(streamConfigEntry.getValue()); - if (!shardSyncTaskManager.castShardSyncTask()) { + if (!shardSyncTaskManager.submitShardSyncTask()) { log.warn( - "Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + "Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.", shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); } } else { - log.info("Skipping shard sync for {} as either hash ranges are complete in the lease table or leases hole confidence is not achieved.", streamConfigEntry.getKey()); + log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(), + shardSyncResponse.reasonForDecision()); } } } catch (Exception e) { log.error("Caught exception while running periodic shard syncer.", e); } } else { - log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId); } } @@ -204,35 +208,49 @@ class PeriodicShardSyncManager { } @VisibleForTesting - boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { + ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List leases) { if (CollectionUtils.isNullOrEmpty(leases)) { // If the leases is null or empty then we need to do shard sync log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier); - return true; + return new ShardSyncResponse(true, "No leases found for " + streamIdentifier); } // Check if there are any holes in the leases and return the first hole if present. Optional hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); if (hashRangeHoleOpt.isPresent()) { // If hole is present, check if the hole is detected consecutively in previous occurrences. // If hole is determined with high confidence return true; return false otherwise - return hashRangeHoleTrackerMap.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()) + // We are using the high confidence factor to avoid shard sync on any holes during resharding and + // lease cleanups or any intermittent issues. + final HashRangeHoleTracker hashRangeHoleTracker = hashRangeHoleTrackerMap + .computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()); + final boolean hasHoleWithHighConfidence = hashRangeHoleTracker .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); + return new ShardSyncResponse(hasHoleWithHighConfidence, + "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + + " times. Shard sync will be initiated when threshold breaches " + + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); } else { // If hole is not present, clear any previous tracking for this stream and return false; hashRangeHoleTrackerMap.remove(streamIdentifier); - return false; + return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier); } } + @Value + @Accessors(fluent = true) + @VisibleForTesting + static class ShardSyncResponse { + private final boolean shouldDoShardSync; + private final String reasonForDecision; + } + private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { // Filter the leases with any checkpoint other than shard end. List activeLeases = leases.stream() .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList()); List activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases); - List hashRangesForActiveLeases = activeLeasesWithHashRanges.stream() - .map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList()); - return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY); + return checkForHoleInHashKeyRanges(streamIdentifier, activeLeasesWithHashRanges); } // If leases are missing hashranges information, update the leases in-memory as well as in the lease storage @@ -259,7 +277,7 @@ class PeriodicShardSyncManager { } lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); try { - leaseRefresher.updateLease(lease, UpdateField.HASH_KEY_RANGE); + leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); } catch (Exception e) { log.warn( "Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.", @@ -275,38 +293,46 @@ class PeriodicShardSyncManager { @VisibleForTesting static Optional checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, - List hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { - // Sort and merge the overlapping hash ranges. - List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); - - if(mergedHashKeyRanges.isEmpty()) { - log.error("No valid hashranges found for stream {} between {} and {}.", streamIdentifier, - MIN_HASH_KEY, MAX_HASH_KEY); - return Optional.of(new HashRangeHole(new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY), - new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY))); + List leasesWithHashKeyRanges) { + // Sort the hash ranges by starting hash key. + List sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges); + if(sortedLeasesWithHashKeyRanges.isEmpty()) { + log.error("No leases with valid hashranges found for stream {}", streamIdentifier); + return Optional.of(new HashRangeHole()); } - // Validate for hashranges bounds. - if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges - .get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { + if (!sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease().startingHashKey().equals(MIN_HASH_KEY) || !sortedLeasesWithHashKeyRanges + .get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease().endingHashKey().equals(MAX_HASH_KEY)) { log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, - mergedHashKeyRanges.get(0), - mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); - return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), - mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); + sortedLeasesWithHashKeyRanges.get(0), + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1)); + return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0), + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1))); } // Check for any holes in the sorted hashrange intervals. - if (mergedHashKeyRanges.size() > 1) { - for (int i = 1; i < mergedHashKeyRanges.size(); i++) { - final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); - final HashKeyRangeForLease hashRangeAtEndOfPossibleHole = mergedHashKeyRanges.get(i); - final BigInteger startOfPossibleHole = hashRangeAtStartOfPossibleHole.endingHashKey(); - final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); - - if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { - log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, - hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole); - return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); + if (sortedLeasesWithHashKeyRanges.size() > 1) { + Lease leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(0); + HashKeyRangeForLease leftLeaseHashRange = leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease(); + for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) { + final HashKeyRangeForLease rightLeaseHashRange = sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease(); + final BigInteger rangeDiff = rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey()); + // Case of overlapping leases when the rangediff is 0 or negative. + // signum() will be -1 for negative and 0 if value is 0. + // Merge the range for further tracking. + if (rangeDiff.signum() <= 0) { + leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(), + leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey())); + } else { + // Case of non overlapping leases when rangediff is positive. signum() will be 1 for positive. + // If rangeDiff is 1, then it is a case of continuous hashrange. If not, it is a hole. + if (!rangeDiff.equals(BigInteger.ONE)) { + log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, + leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i)); + return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole, + sortedLeasesWithHashKeyRanges.get(i))); + } + leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i); + leftLeaseHashRange = rightLeaseHashRange; } } } @@ -314,41 +340,31 @@ class PeriodicShardSyncManager { } @VisibleForTesting - static List sortAndMergeOverlappingHashRanges( - List hashKeyRanges) { - if (hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) - return hashKeyRanges; - - Collections.sort(hashKeyRanges, new HashKeyRangeComparator()); - - final HashKeyRangeForLease first = hashKeyRanges.get(0); - BigInteger start = first.startingHashKey(); - BigInteger end = first.endingHashKey(); - - final List result = new ArrayList<>(); - - for (int i = 1; i < hashKeyRanges.size(); i++) { - HashKeyRangeForLease current = hashKeyRanges.get(i); - if (current.startingHashKey().compareTo(end) <= 0) { - end = current.endingHashKey().max(end); - } else { - result.add(new HashKeyRangeForLease(start, end)); - start = current.startingHashKey(); - end = current.endingHashKey(); - } - } - result.add(new HashKeyRangeForLease(start, end)); - return result; + static List sortLeasesByHashRange(List leasesWithHashKeyRanges) { + if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) + return leasesWithHashKeyRanges; + Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); + return leasesWithHashKeyRanges; } @Value private static class HashRangeHole { - private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; - private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; + HashRangeHole() { + leaseAtEndOfPossibleHole = leaseAtStartOfPossibleHole = null; + } + + HashRangeHole(Lease leaseAtStartOfPossibleHole, Lease leaseAtEndOfPossibleHole) { + this.leaseAtStartOfPossibleHole = leaseAtStartOfPossibleHole; + this.leaseAtEndOfPossibleHole = leaseAtEndOfPossibleHole; + } + + private final Lease leaseAtStartOfPossibleHole; + private final Lease leaseAtEndOfPossibleHole; } private static class HashRangeHoleTracker { private HashRangeHole hashRangeHole; + @Getter private Integer numConsecutiveHoles; public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { @@ -365,12 +381,17 @@ class PeriodicShardSyncManager { /** * Helper class to compare leases based on their hash range. */ - private static class HashKeyRangeComparator implements Comparator, Serializable { + private static class HashKeyRangeComparator implements Comparator, Serializable { private static final long serialVersionUID = 1L; - @Override public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { - return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey()); + @Override public int compare(Lease lease, Lease otherLease) { + Validate.notNull(lease); + Validate.notNull(otherLease); + Validate.notNull(lease.hashKeyRangeForLease()); + Validate.notNull(otherLease.hashKeyRangeForLease()); + return lease.hashKeyRangeForLease().startingHashKey() + .compareTo(otherLease.hashKeyRangeForLease().startingHashKey()); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 3b1003ee..91125a06 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -417,7 +417,7 @@ public class Scheduler implements Runnable { final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); final StreamConfig streamConfig = currentStreamConfigMap .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); - if (createOrGetShardSyncTaskManager(streamConfig).castShardSyncTask()) { + if (createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask()) { log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", streamIdentifier.serialize(), completedShard.toString()); } @@ -480,7 +480,7 @@ public class Scheduler implements Runnable { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.castShardSyncTask(); + shardSyncTaskManager.submitShardSyncTask(); currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); streamsSynced.add(streamIdentifier); } else { @@ -508,7 +508,7 @@ public class Scheduler implements Runnable { + ". Syncing shards of that stream."); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( currentStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.castShardSyncTask(); + shardSyncTaskManager.submitShardSyncTask(); currentSetOfStreamsIter.remove(); streamsSynced.add(streamIdentifier); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 4ba0cf86..b7f38a4e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -201,7 +201,7 @@ public interface LeaseRefresher { * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws DependencyException if DynamoDB update fails in an unexpected way */ - default void updateLease(Lease lease, UpdateField updateField) + default void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField) throws DependencyException, InvalidStateException, ProvisionedThroughputException { throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index de3d4c6c..6a1ceff4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -145,10 +145,10 @@ public class ShardSyncTaskManager { } /** - * Cast a ShardSyncTask and return if the casting is successful. + * Submit a ShardSyncTask and return if the submission is successful. * @return if the casting is successful. */ - public boolean castShardSyncTask() { + public boolean submitShardSyncTask() { try { lock.lock(); return checkAndSubmitNextTask(); @@ -205,7 +205,7 @@ public class ShardSyncTaskManager { log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); } // Acquire lock here. If shardSyncRequestPending is false in this completionStage and - // castShardSyncTask is invoked, before completion stage exits (future completes) + // submitShardSyncTask is invoked, before completion stage exits (future completes) // but right after the value of shardSyncRequestPending is checked, it will result in // shardSyncRequestPending being set to true, but no pending futures to trigger the next // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java index c15449ca..9461a18e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java @@ -14,6 +14,13 @@ */ package software.amazon.kinesis.leases; +/** + * These are the special fields that will be updated only once during the lifetime of the lease. + * Since these are meta information that will not affect lease ownership or data durability, we allow + * any elected leader or worker to set these fields directly without any conditional checks. + * Note that though HASH_KEY_RANGE will be available during lease initialization in newer versions, we keep this + * for backfilling while rolling forward to newer versions. + */ public enum UpdateField { CHILD_SHARDS, HASH_KEY_RANGE } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 867cc507..30201236 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -661,7 +661,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { } @Override - public void updateLease(Lease lease, UpdateField updateField) + public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField) throws DependencyException, InvalidStateException, ProvisionedThroughputException { log.debug("Updating lease without expectation {}", lease); final AWSExceptionManager exceptionManager = createExceptionManager(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index eb88c0dc..9577e7a8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -33,7 +33,6 @@ import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -71,129 +70,50 @@ public class PeriodicShardSyncManagerTest { shardSyncTaskManagerProvider, true); } - @Test - public void testIfHashRangesAreNotMergedWhenNoOverlappingIntervalsGiven() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(hashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreSortedWhenNoOverlappingIntervalsGiven() { - List hashRanges = new ArrayList() {{ - add(deserialize("2", "3")); - add(deserialize("0", "1")); - add(deserialize("24", "30")); - add(deserialize("4", "23")); - }}; - List hashRangesCopy = new ArrayList<>(); - hashRangesCopy.addAll(hashRanges); - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRangesCopy); - Assert.assertEquals(hashRangesCopy, sortAndMergedHashRanges); - Assert.assertNotEquals(hashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase1() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("6", "23")); - add(deserialize("24", "30")); - }}; - List expectedHashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase2() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "5")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List expectedHashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); - } - - @Test - public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase3() { - List hashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("4", "5")); - add(deserialize("24", "30")); - }}; - List expectedHashRanges = new ArrayList() {{ - add(deserialize("0", "1")); - add(deserialize("2", "3")); - add(deserialize("4", "23")); - add(deserialize("24", "30")); - }}; - List sortAndMergedHashRanges = PeriodicShardSyncManager - .sortAndMergeOverlappingHashRanges(hashRanges); - Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges); - } - @Test public void testForFailureWhenHashRangesAreIncomplete() { - List hashRanges = new ArrayList() {{ + List hashRanges = new ArrayList() {{ add(deserialize("0", "1")); add(deserialize("2", "3")); add(deserialize("4", "23")); add(deserialize("6", "23")); - add(deserialize("25", "30")); // Missing interval here - }}; + add(deserialize("25", MAX_HASH_KEY.toString())); // Missing interval here + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); Assert.assertTrue(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); } @Test public void testForSuccessWhenHashRangesAreComplete() { - List hashRanges = new ArrayList() {{ + List hashRanges = new ArrayList() {{ add(deserialize("0", "1")); add(deserialize("2", "3")); add(deserialize("4", "23")); add(deserialize("6", "23")); - add(deserialize("24", "30")); - }}; + add(deserialize("24", MAX_HASH_KEY.toString())); + }}.stream().map(hashKeyRangeForLease -> { + Lease lease = new MultiStreamLease(); + lease.hashKeyRange(hashKeyRangeForLease); + lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + return lease; + }).collect(Collectors.toList()); Assert.assertFalse(PeriodicShardSyncManager - .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); + .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent()); } @Test public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() { - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, null)); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, null).shouldDoShardSync()); } @Test public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() { - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, new ArrayList<>())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, new ArrayList<>()).shouldDoShardSync()); } @Test @@ -211,7 +131,7 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); } @Test @@ -229,8 +149,8 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -252,8 +172,8 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -271,8 +191,8 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -290,7 +210,7 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); List multiStreamLeases2 = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize("2", "3")); // Hole between 3 and 5 @@ -305,8 +225,8 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); // Resetting the holes IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()); } @Test @@ -324,7 +244,7 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); List multiStreamLeases2 = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize("2", "3")); // Hole between 3 and 5 @@ -339,11 +259,11 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); // Resetting the holes IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); // Resetting the holes IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @Test @@ -385,8 +305,8 @@ public class PeriodicShardSyncManagerTest { // Assert that shard sync should never trigger IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); // Assert that all the leases now has hashRanges set. for(Lease lease : multiStreamLeases) { @@ -433,8 +353,8 @@ public class PeriodicShardSyncManagerTest { // Assert that shard sync should never trigger IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert - .assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); - Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); + .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); + Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); // Assert that all the leases now has hashRanges set. for(Lease lease : multiStreamLeases) {