Addressed review comments

This commit is contained in:
Ashwin Giridharan 2020-05-21 00:57:24 -07:00
parent 02ea8cd70f
commit fc4781e347
7 changed files with 146 additions and 198 deletions

View file

@ -19,6 +19,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
import lombok.Value; import lombok.Value;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
@ -161,25 +162,28 @@ class PeriodicShardSyncManager {
// For each of the stream, check if shard sync needs to be done based on the leases state. // For each of the stream, check if shard sync needs to be done based on the leases state.
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) { for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
if (shouldDoShardSync(streamConfigEntry.getKey(), final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey()))) { streamToLeasesMap.get(streamConfigEntry.getKey()));
log.info("Periodic shard syncer initiating shard sync for {}", streamConfigEntry.getKey()); if (shardSyncResponse.shouldDoShardSync()) {
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - ",
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider
.apply(streamConfigEntry.getValue()); .apply(streamConfigEntry.getValue());
if (!shardSyncTaskManager.castShardSyncTask()) { if (!shardSyncTaskManager.submitShardSyncTask()) {
log.warn( log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", "Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
} }
} else { } else {
log.info("Skipping shard sync for {} as either hash ranges are complete in the lease table or leases hole confidence is not achieved.", streamConfigEntry.getKey()); log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(),
shardSyncResponse.reasonForDecision());
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Caught exception while running periodic shard syncer.", e); log.error("Caught exception while running periodic shard syncer.", e);
} }
} else { } else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId);
} }
} }
@ -204,35 +208,49 @@ class PeriodicShardSyncManager {
} }
@VisibleForTesting @VisibleForTesting
boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) { ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) { if (CollectionUtils.isNullOrEmpty(leases)) {
// If the leases is null or empty then we need to do shard sync // If the leases is null or empty then we need to do shard sync
log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier); log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier);
return true; return new ShardSyncResponse(true, "No leases found for " + streamIdentifier);
} }
// Check if there are any holes in the leases and return the first hole if present. // Check if there are any holes in the leases and return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases);
if (hashRangeHoleOpt.isPresent()) { if (hashRangeHoleOpt.isPresent()) {
// If hole is present, check if the hole is detected consecutively in previous occurrences. // If hole is present, check if the hole is detected consecutively in previous occurrences.
// If hole is determined with high confidence return true; return false otherwise // If hole is determined with high confidence return true; return false otherwise
return hashRangeHoleTrackerMap.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()) // We are using the high confidence factor to avoid shard sync on any holes during resharding and
// lease cleanups or any intermittent issues.
final HashRangeHoleTracker hashRangeHoleTracker = hashRangeHoleTrackerMap
.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker());
final boolean hasHoleWithHighConfidence = hashRangeHoleTracker
.hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
return new ShardSyncResponse(hasHoleWithHighConfidence,
"Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles()
+ " times. Shard sync will be initiated when threshold breaches "
+ CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY);
} else { } else {
// If hole is not present, clear any previous tracking for this stream and return false; // If hole is not present, clear any previous tracking for this stream and return false;
hashRangeHoleTrackerMap.remove(streamIdentifier); hashRangeHoleTrackerMap.remove(streamIdentifier);
return false; return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier);
} }
} }
@Value
@Accessors(fluent = true)
@VisibleForTesting
static class ShardSyncResponse {
private final boolean shouldDoShardSync;
private final String reasonForDecision;
}
private Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) { private Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) {
// Filter the leases with any checkpoint other than shard end. // Filter the leases with any checkpoint other than shard end.
List<Lease> activeLeases = leases.stream() List<Lease> activeLeases = leases.stream()
.filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList()); .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()).collect(Collectors.toList());
List<Lease> activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases); List<Lease> activeLeasesWithHashRanges = fillWithHashRangesIfRequired(streamIdentifier, activeLeases);
List<HashKeyRangeForLease> hashRangesForActiveLeases = activeLeasesWithHashRanges.stream() return checkForHoleInHashKeyRanges(streamIdentifier, activeLeasesWithHashRanges);
.map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList());
return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY);
} }
// If leases are missing hashranges information, update the leases in-memory as well as in the lease storage // If leases are missing hashranges information, update the leases in-memory as well as in the lease storage
@ -259,7 +277,7 @@ class PeriodicShardSyncManager {
} }
lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange())); lease.hashKeyRange(fromHashKeyRange(shard.hashKeyRange()));
try { try {
leaseRefresher.updateLease(lease, UpdateField.HASH_KEY_RANGE); leaseRefresher.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
} catch (Exception e) { } catch (Exception e) {
log.warn( log.warn(
"Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.", "Unable to update hash range key information for lease {} of stream {}. This may result in explicit lease sync.",
@ -275,38 +293,46 @@ class PeriodicShardSyncManager {
@VisibleForTesting @VisibleForTesting
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, static Optional<HashRangeHole> checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier,
List<HashKeyRangeForLease> hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { List<Lease> leasesWithHashKeyRanges) {
// Sort and merge the overlapping hash ranges. // Sort the hash ranges by starting hash key.
List<HashKeyRangeForLease> mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); List<Lease> sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges);
if(sortedLeasesWithHashKeyRanges.isEmpty()) {
if(mergedHashKeyRanges.isEmpty()) { log.error("No leases with valid hashranges found for stream {}", streamIdentifier);
log.error("No valid hashranges found for stream {} between {} and {}.", streamIdentifier, return Optional.of(new HashRangeHole());
MIN_HASH_KEY, MAX_HASH_KEY);
return Optional.of(new HashRangeHole(new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY),
new HashKeyRangeForLease(MIN_HASH_KEY, MAX_HASH_KEY)));
} }
// Validate for hashranges bounds. // Validate for hashranges bounds.
if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges if (!sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease().startingHashKey().equals(MIN_HASH_KEY) || !sortedLeasesWithHashKeyRanges
.get(mergedHashKeyRanges.size() - 1).endingHashKey().equals(maxHashKey)) { .get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease().endingHashKey().equals(MAX_HASH_KEY)) {
log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier,
mergedHashKeyRanges.get(0), sortedLeasesWithHashKeyRanges.get(0),
mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1)); sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1));
return Optional.of(new HashRangeHole(mergedHashKeyRanges.get(0), return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0),
mergedHashKeyRanges.get(mergedHashKeyRanges.size() - 1))); sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1)));
} }
// Check for any holes in the sorted hashrange intervals. // Check for any holes in the sorted hashrange intervals.
if (mergedHashKeyRanges.size() > 1) { if (sortedLeasesWithHashKeyRanges.size() > 1) {
for (int i = 1; i < mergedHashKeyRanges.size(); i++) { Lease leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(0);
final HashKeyRangeForLease hashRangeAtStartOfPossibleHole = mergedHashKeyRanges.get(i - 1); HashKeyRangeForLease leftLeaseHashRange = leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease();
final HashKeyRangeForLease hashRangeAtEndOfPossibleHole = mergedHashKeyRanges.get(i); for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) {
final BigInteger startOfPossibleHole = hashRangeAtStartOfPossibleHole.endingHashKey(); final HashKeyRangeForLease rightLeaseHashRange = sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease();
final BigInteger endOfPossibleHole = hashRangeAtEndOfPossibleHole.startingHashKey(); final BigInteger rangeDiff = rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey());
// Case of overlapping leases when the rangediff is 0 or negative.
if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { // signum() will be -1 for negative and 0 if value is 0.
log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, // Merge the range for further tracking.
hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole); if (rangeDiff.signum() <= 0) {
return Optional.of(new HashRangeHole(hashRangeAtStartOfPossibleHole, hashRangeAtEndOfPossibleHole)); leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(),
leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey()));
} else {
// Case of non overlapping leases when rangediff is positive. signum() will be 1 for positive.
// If rangeDiff is 1, then it is a case of continuous hashrange. If not, it is a hole.
if (!rangeDiff.equals(BigInteger.ONE)) {
log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier,
leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i));
return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole,
sortedLeasesWithHashKeyRanges.get(i)));
}
leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i);
leftLeaseHashRange = rightLeaseHashRange;
} }
} }
} }
@ -314,41 +340,31 @@ class PeriodicShardSyncManager {
} }
@VisibleForTesting @VisibleForTesting
static List<HashKeyRangeForLease> sortAndMergeOverlappingHashRanges( static List<Lease> sortLeasesByHashRange(List<Lease> leasesWithHashKeyRanges) {
List<HashKeyRangeForLease> hashKeyRanges) { if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1)
if (hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) return leasesWithHashKeyRanges;
return hashKeyRanges; Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator());
return leasesWithHashKeyRanges;
Collections.sort(hashKeyRanges, new HashKeyRangeComparator());
final HashKeyRangeForLease first = hashKeyRanges.get(0);
BigInteger start = first.startingHashKey();
BigInteger end = first.endingHashKey();
final List<HashKeyRangeForLease> result = new ArrayList<>();
for (int i = 1; i < hashKeyRanges.size(); i++) {
HashKeyRangeForLease current = hashKeyRanges.get(i);
if (current.startingHashKey().compareTo(end) <= 0) {
end = current.endingHashKey().max(end);
} else {
result.add(new HashKeyRangeForLease(start, end));
start = current.startingHashKey();
end = current.endingHashKey();
}
}
result.add(new HashKeyRangeForLease(start, end));
return result;
} }
@Value @Value
private static class HashRangeHole { private static class HashRangeHole {
private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; HashRangeHole() {
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; leaseAtEndOfPossibleHole = leaseAtStartOfPossibleHole = null;
}
HashRangeHole(Lease leaseAtStartOfPossibleHole, Lease leaseAtEndOfPossibleHole) {
this.leaseAtStartOfPossibleHole = leaseAtStartOfPossibleHole;
this.leaseAtEndOfPossibleHole = leaseAtEndOfPossibleHole;
}
private final Lease leaseAtStartOfPossibleHole;
private final Lease leaseAtEndOfPossibleHole;
} }
private static class HashRangeHoleTracker { private static class HashRangeHoleTracker {
private HashRangeHole hashRangeHole; private HashRangeHole hashRangeHole;
@Getter
private Integer numConsecutiveHoles; private Integer numConsecutiveHoles;
public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) {
@ -365,12 +381,17 @@ class PeriodicShardSyncManager {
/** /**
* Helper class to compare leases based on their hash range. * Helper class to compare leases based on their hash range.
*/ */
private static class HashKeyRangeComparator implements Comparator<HashKeyRangeForLease>, Serializable { private static class HashKeyRangeComparator implements Comparator<Lease>, Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { @Override public int compare(Lease lease, Lease otherLease) {
return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey()); Validate.notNull(lease);
Validate.notNull(otherLease);
Validate.notNull(lease.hashKeyRangeForLease());
Validate.notNull(otherLease.hashKeyRangeForLease());
return lease.hashKeyRangeForLease().startingHashKey()
.compareTo(otherLease.hashKeyRangeForLease().startingHashKey());
} }
} }
} }

View file

@ -417,7 +417,7 @@ public class Scheduler implements Runnable {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
final StreamConfig streamConfig = currentStreamConfigMap final StreamConfig streamConfig = currentStreamConfigMap
.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
if (createOrGetShardSyncTaskManager(streamConfig).castShardSyncTask()) { if (createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask()) {
log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ",
streamIdentifier.serialize(), completedShard.toString()); streamIdentifier.serialize(), completedShard.toString());
} }
@ -480,7 +480,7 @@ public class Scheduler implements Runnable {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.castShardSyncTask(); shardSyncTaskManager.submitShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} else { } else {
@ -508,7 +508,7 @@ public class Scheduler implements Runnable {
+ ". Syncing shards of that stream."); + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier)); currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.castShardSyncTask(); shardSyncTaskManager.submitShardSyncTask();
currentSetOfStreamsIter.remove(); currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);
} }

View file

@ -201,7 +201,7 @@ public interface LeaseRefresher {
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way * @throws DependencyException if DynamoDB update fails in an unexpected way
*/ */
default void updateLease(Lease lease, UpdateField updateField) default void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented"); throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented");
} }

View file

@ -145,10 +145,10 @@ public class ShardSyncTaskManager {
} }
/** /**
* Cast a ShardSyncTask and return if the casting is successful. * Submit a ShardSyncTask and return if the submission is successful.
* @return if the casting is successful. * @return if the casting is successful.
*/ */
public boolean castShardSyncTask() { public boolean submitShardSyncTask() {
try { try {
lock.lock(); lock.lock();
return checkAndSubmitNextTask(); return checkAndSubmitNextTask();
@ -205,7 +205,7 @@ public class ShardSyncTaskManager {
log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException()); log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException());
} }
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and // Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// castShardSyncTask is invoked, before completion stage exits (future completes) // submitShardSyncTask is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in // but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next // shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the

View file

@ -14,6 +14,13 @@
*/ */
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
/**
* These are the special fields that will be updated only once during the lifetime of the lease.
* Since these are meta information that will not affect lease ownership or data durability, we allow
* any elected leader or worker to set these fields directly without any conditional checks.
* Note that though HASH_KEY_RANGE will be available during lease initialization in newer versions, we keep this
* for backfilling while rolling forward to newer versions.
*/
public enum UpdateField { public enum UpdateField {
CHILD_SHARDS, HASH_KEY_RANGE CHILD_SHARDS, HASH_KEY_RANGE
} }

View file

@ -661,7 +661,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
} }
@Override @Override
public void updateLease(Lease lease, UpdateField updateField) public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Updating lease without expectation {}", lease); log.debug("Updating lease without expectation {}", lease);
final AWSExceptionManager exceptionManager = createExceptionManager(); final AWSExceptionManager exceptionManager = createExceptionManager();

View file

@ -33,7 +33,6 @@ import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -71,129 +70,50 @@ public class PeriodicShardSyncManagerTest {
shardSyncTaskManagerProvider, true); shardSyncTaskManagerProvider, true);
} }
@Test
public void testIfHashRangesAreNotMergedWhenNoOverlappingIntervalsGiven() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(hashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreSortedWhenNoOverlappingIntervalsGiven() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("2", "3"));
add(deserialize("0", "1"));
add(deserialize("24", "30"));
add(deserialize("4", "23"));
}};
List<HashKeyRangeForLease> hashRangesCopy = new ArrayList<>();
hashRangesCopy.addAll(hashRanges);
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRangesCopy);
Assert.assertEquals(hashRangesCopy, sortAndMergedHashRanges);
Assert.assertNotEquals(hashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase1() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("6", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> expectedHashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase2() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "5"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> expectedHashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges);
}
@Test
public void testIfHashRangesAreMergedWhenOverlappingIntervalsGivenCase3() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("4", "5"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> expectedHashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1"));
add(deserialize("2", "3"));
add(deserialize("4", "23"));
add(deserialize("24", "30"));
}};
List<HashKeyRangeForLease> sortAndMergedHashRanges = PeriodicShardSyncManager
.sortAndMergeOverlappingHashRanges(hashRanges);
Assert.assertEquals(expectedHashRanges, sortAndMergedHashRanges);
}
@Test @Test
public void testForFailureWhenHashRangesAreIncomplete() { public void testForFailureWhenHashRangesAreIncomplete() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{ List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1")); add(deserialize("0", "1"));
add(deserialize("2", "3")); add(deserialize("2", "3"));
add(deserialize("4", "23")); add(deserialize("4", "23"));
add(deserialize("6", "23")); add(deserialize("6", "23"));
add(deserialize("25", "30")); // Missing interval here add(deserialize("25", MAX_HASH_KEY.toString())); // Missing interval here
}}; }}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertTrue(PeriodicShardSyncManager Assert.assertTrue(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
} }
@Test @Test
public void testForSuccessWhenHashRangesAreComplete() { public void testForSuccessWhenHashRangesAreComplete() {
List<HashKeyRangeForLease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{ List<Lease> hashRanges = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize("0", "1")); add(deserialize("0", "1"));
add(deserialize("2", "3")); add(deserialize("2", "3"));
add(deserialize("4", "23")); add(deserialize("4", "23"));
add(deserialize("6", "23")); add(deserialize("6", "23"));
add(deserialize("24", "30")); add(deserialize("24", MAX_HASH_KEY.toString()));
}}; }}.stream().map(hashKeyRangeForLease -> {
Lease lease = new MultiStreamLease();
lease.hashKeyRange(hashKeyRangeForLease);
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease;
}).collect(Collectors.toList());
Assert.assertFalse(PeriodicShardSyncManager Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(streamIdentifier, hashRanges, BigInteger.ZERO, BigInteger.valueOf(30)).isPresent()); .checkForHoleInHashKeyRanges(streamIdentifier, hashRanges).isPresent());
} }
@Test @Test
public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() { public void testIfShardSyncIsInitiatedWhenNoLeasesArePassed() {
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, null)); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, null).shouldDoShardSync());
} }
@Test @Test
public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() { public void testIfShardSyncIsInitiatedWhenEmptyLeasesArePassed() {
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, new ArrayList<>())); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, new ArrayList<>()).shouldDoShardSync());
} }
@Test @Test
@ -211,7 +131,7 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
} }
@Test @Test
@ -229,8 +149,8 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@Test @Test
@ -252,8 +172,8 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@Test @Test
@ -271,8 +191,8 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@Test @Test
@ -290,7 +210,7 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{ List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5 add(deserialize("2", "3")); // Hole between 3 and 5
@ -305,8 +225,8 @@ public class PeriodicShardSyncManagerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// Resetting the holes // Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2)); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync());
} }
@Test @Test
@ -324,7 +244,7 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{ List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize(MIN_HASH_KEY.toString(), "1"));
add(deserialize("2", "3")); // Hole between 3 and 5 add(deserialize("2", "3")); // Hole between 3 and 5
@ -339,11 +259,11 @@ public class PeriodicShardSyncManagerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// Resetting the holes // Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases2))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
// Resetting the holes // Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@Test @Test
@ -385,8 +305,8 @@ public class PeriodicShardSyncManagerTest {
// Assert that shard sync should never trigger // Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
// Assert that all the leases now has hashRanges set. // Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) { for(Lease lease : multiStreamLeases) {
@ -433,8 +353,8 @@ public class PeriodicShardSyncManagerTest {
// Assert that shard sync should never trigger // Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases))); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.shouldDoShardSync(streamIdentifier, multiStreamLeases)); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
// Assert that all the leases now has hashRanges set. // Assert that all the leases now has hashRanges set.
for(Lease lease : multiStreamLeases) { for(Lease lease : multiStreamLeases) {