Hash range hole confidence check

This commit is contained in:
Ashwin Giridharan 2020-05-02 03:22:59 -07:00
parent eb00229602
commit a6922d9d7e

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.coordinator;
import com.google.common.annotations.VisibleForTesting;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
@ -61,6 +62,8 @@ class PeriodicShardSyncManager {
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 5 * 60 * 1000L;
private static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
private static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
private static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();
private final String workerId;
private final LeaderDecider leaderDecider;
@ -145,6 +148,7 @@ class PeriodicShardSyncManager {
try {
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet());
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue());
if (!shardSyncTaskManager.syncShardAndLeaseInfo()) {
log.warn(
@ -160,7 +164,8 @@ class PeriodicShardSyncManager {
}
}
private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(final Set<StreamIdentifier> streamIdentifiersToFilter)
private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(
final Set<StreamIdentifier> streamIdentifiersToFilter)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final List<Lease> leases = leaseRefresher.listLeases();
if (!isMultiStreamingMode) {
@ -179,25 +184,37 @@ class PeriodicShardSyncManager {
}
}
/**
* Checks if the entire hash range is covered
* @return true if covered, false otherwise
*/
private boolean isHashRangeCompleteForLeases(List<Lease> leases) {
// TODO : Catch exception
private boolean shouldDoShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) {
return false;
throw new IllegalArgumentException("No leases found to validate for the stream " + streamIdentifier);
}
// Check if there are any holes in the leases and return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases);
if (hashRangeHoleOpt.isPresent()) {
// If hole is present, check if the hole is detected consecutively in previous occurrences.
// If hole is determined with high confidence return true; return false otherwise
return hashRangeHoleTrackerMap.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker())
.hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
} else {
List<HashKeyRangeForLease> hashRangesForActiveLeases = leases.stream()
.filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd())
.map(lease -> lease.hashKeyRangeForLease())
.collect(Collectors.toList());
return !checkForHoleInHashKeyRanges(hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY).isPresent();
// If hole is not present, clear any previous tracking for this stream and return false;
hashRangeHoleTrackerMap.remove(streamIdentifier);
return false;
}
}
private Optional<HashRangeHole> hasHoleInLeases(StreamIdentifier streamIdentifier, List<Lease> leases) {
// Filter the hashranges of leases which has any checkpoint other than shard end.
List<HashKeyRangeForLease> hashRangesForActiveLeases = leases.stream()
.filter(lease -> lease.checkpoint() != null && !lease.checkpoint().isShardEnd())
.map(lease -> lease.hashKeyRangeForLease()).collect(Collectors.toList());
return checkForHoleInHashKeyRanges(streamIdentifier, hashRangesForActiveLeases, MIN_HASH_KEY, MAX_HASH_KEY);
}
@VisibleForTesting
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(List<HashKeyRangeForLease> hashKeyRanges,
BigInteger minHashKey, BigInteger maxHashKey) {
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(StreamIdentifier streamIdentifier,
List<HashKeyRangeForLease> hashKeyRanges, BigInteger minHashKey, BigInteger maxHashKey) {
List<HashKeyRangeForLease> mergedHashKeyRanges = sortAndMergeOverlappingHashRanges(hashKeyRanges);
if (!mergedHashKeyRanges.get(0).startingHashKey().equals(minHashKey) || !mergedHashKeyRanges
@ -225,7 +242,8 @@ class PeriodicShardSyncManager {
}
@VisibleForTesting
static List<HashKeyRangeForLease> sortAndMergeOverlappingHashRanges(List<HashKeyRangeForLease> hashKeyRanges) {
static List<HashKeyRangeForLease> sortAndMergeOverlappingHashRanges(
List<HashKeyRangeForLease> hashKeyRanges) {
if (hashKeyRanges.size() == 0 || hashKeyRanges.size() == 1)
return hashKeyRanges;
@ -257,6 +275,21 @@ class PeriodicShardSyncManager {
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
}
private static class HashRangeHoleTracker {
private HashRangeHole hashRangeHole;
private Integer numConsecutiveHoles;
public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) {
if (hashRangeHole.equals(this.hashRangeHole)) {
++this.numConsecutiveHoles;
} else {
this.hashRangeHole = hashRangeHole;
this.numConsecutiveHoles = 1;
}
return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
}
}
/**
* Helper class to compare leases based on their hash range.
*/
@ -264,8 +297,7 @@ class PeriodicShardSyncManager {
private static final long serialVersionUID = 1L;
@Override
public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) {
@Override public int compare(HashKeyRangeForLease hashKeyRange, HashKeyRangeForLease otherHashKeyRange) {
return hashKeyRange.startingHashKey().compareTo(otherHashKeyRange.startingHashKey());
}
}