diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 3edec0ec..90c95cf3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.coordinator; import com.google.common.annotations.VisibleForTesting; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NonNull; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; @@ -61,6 +62,8 @@ class PeriodicShardSyncManager { private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L; private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); + private static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; + private Map hashRangeHoleTrackerMap = new HashMap<>(); private final String workerId; private final LeaderDecider leaderDecider; @@ -145,6 +148,7 @@ class PeriodicShardSyncManager { try { final Map> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet()); for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue()); if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { log.warn( @@ -160,18 +164,19 @@ class PeriodicShardSyncManager { } } - private Map> getStreamToLeasesMap(final Set streamIdentifiersToFilter) + private Map> getStreamToLeasesMap( + final Set streamIdentifiersToFilter) throws DependencyException, ProvisionedThroughputException, InvalidStateException { final List leases = leaseRefresher.listLeases(); - if(!isMultiStreamingMode) { + if (!isMultiStreamingMode) { Validate.isTrue(streamIdentifiersToFilter.size() == 1); return Collections.singletonMap(streamIdentifiersToFilter.iterator().next(), leases); } else { final Map> streamToLeasesMap = new HashMap<>(); - for(Lease lease : leases) { + for (Lease lease : leases) { StreamIdentifier streamIdentifier = StreamIdentifier .multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()); - if(streamIdentifiersToFilter.contains(streamIdentifier)) { + if (streamIdentifiersToFilter.contains(streamIdentifier)) { streamToLeasesMap.computeIfAbsent(streamIdentifier, s -> new ArrayList<>()).add(lease); } } @@ -179,25 +184,37 @@ class PeriodicShardSyncManager { } } - /** - * Checks if the entire hash range is covered - * @return true if covered, false otherwise - */ - private boolean isHashRangeCompleteForLeases(List leases) { + // TODO : Catch exception + private boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List leases) { if (CollectionUtils.isNullOrEmpty(leases)) { - return false; + throw new IllegalArgumentException("No leases found to validate for the stream " + streamIdentifier); + } + // Check if there are any holes in the leases and return the first hole if present. + Optional hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); + if (hashRangeHoleOpt.isPresent()) { + // If hole is present, check if the hole is detected consecutively in previous occurrences. + // If hole is determined with high confidence return true; return false otherwise + return hashRangeHoleTrackerMap.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()) + .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); + } else { - List hashRangesForActiveLeases = leases.stream() - .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) - .map(lease -> lease.hashKeyRangeForLease()) - .collect(Collectors.toList()); - return !checkForHoleInHashKeyRanges(hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY).isPresent(); + // If hole is not present, clear any previous tracking for this stream and return false; + hashRangeHoleTrackerMap.remove(streamIdentifier); + return false; } } + private Optional hasHoleInLeases(StreamIdentifier streamIdentifier, List leases) { + // Filter the hashranges of leases which has any checkpoint other than shard end. + List hashRangesForActiveLeases = leases.stream() + .filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd()) + .map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList()); + return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY); + } + @VisibleForTesting - static Optional checkForHoleInHashKeyRanges(List hashKeyRanges, - BigInteger minHashKey, BigInteger maxHashKey) { + static Optional checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier, + List hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) { List mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges); if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges @@ -225,8 +242,9 @@ class PeriodicShardSyncManager { } @VisibleForTesting - static List sortAndMergeOverlappingHashRanges(List hashKeyRanges) { - if(hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) + static List sortAndMergeOverlappingHashRanges( + List hashKeyRanges) { + if (hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1) return hashKeyRanges; Collections.sort(hashKeyRanges, new HashKeyRangeComparator()); @@ -257,6 +275,21 @@ class PeriodicShardSyncManager { private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; } + private static class HashRangeHoleTracker { + private HashRangeHole hashRangeHole; + private Integer numConsecutiveHoles; + + public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { + if (hashRangeHole.equals(this.hashRangeHole)) { + ++this.numConsecutiveHoles; + } else { + this.hashRangeHole = hashRangeHole; + this.numConsecutiveHoles = 1; + } + return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; + } + } + /** * Helper class to compare leases based on their hash range. */ @@ -264,8 +297,7 @@ class PeriodicShardSyncManager { private static final long serialVersionUID = 1L; - @Override - public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { + @Override public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) { return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey()); } }