Fixing retry logic

This commit is contained in:
Ashwin Giridharan 2020-05-26 11:10:10 -07:00
parent 08ca1b61bc
commit 67d2b082fd

View file

@ -47,6 +47,7 @@ import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -166,7 +167,7 @@ class PeriodicShardSyncManager {
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey())); streamToLeasesMap.get(streamConfigEntry.getKey()));
if (shardSyncResponse.shouldDoShardSync()) { if (shardSyncResponse.shouldDoShardSync()) {
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - ", log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ",
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider
.apply(streamConfigEntry.getValue()); .apply(streamConfigEntry.getValue());
@ -228,7 +229,7 @@ class PeriodicShardSyncManager {
.hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
return new ShardSyncResponse(hasHoleWithHighConfidence, return new ShardSyncResponse(hasHoleWithHighConfidence,
"Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles()
+ " times. Shard sync will be initiated when threshold breaches " + " times. Shard sync will be initiated when threshold reaches "
+ CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY);
} else { } else {
@ -308,8 +309,8 @@ class PeriodicShardSyncManager {
log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier,
sortedLeasesWithHashKeyRanges.get(0), sortedLeasesWithHashKeyRanges.get(0),
sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1)); sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1));
return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0), return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease(),
sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1))); sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease()));
} }
// Check for any holes in the sorted hashrange intervals. // Check for any holes in the sorted hashrange intervals.
if (sortedLeasesWithHashKeyRanges.size() > 1) { if (sortedLeasesWithHashKeyRanges.size() > 1) {
@ -330,8 +331,8 @@ class PeriodicShardSyncManager {
if (!rangeDiff.equals(BigInteger.ONE)) { if (!rangeDiff.equals(BigInteger.ONE)) {
log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier,
leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i)); leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i));
return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole, return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease(),
sortedLeasesWithHashKeyRanges.get(i))); sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease()));
} }
leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i); leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i);
leftLeaseHashRange = rightLeaseHashRange; leftLeaseHashRange = rightLeaseHashRange;
@ -352,16 +353,16 @@ class PeriodicShardSyncManager {
@Value @Value
private static class HashRangeHole { private static class HashRangeHole {
HashRangeHole() { HashRangeHole() {
leaseAtEndOfPossibleHole = leaseAtStartOfPossibleHole = null; hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null;
} }
HashRangeHole(Lease leaseAtStartOfPossibleHole, Lease leaseAtEndOfPossibleHole) { HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole, HashKeyRangeForLease hashRangeAtEndOfPossibleHole) {
this.leaseAtStartOfPossibleHole = leaseAtStartOfPossibleHole; this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole;
this.leaseAtEndOfPossibleHole = leaseAtEndOfPossibleHole; this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole;
} }
private final Lease leaseAtStartOfPossibleHole; private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole;
private final Lease leaseAtEndOfPossibleHole; private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
} }
private static class HashRangeHoleTracker { private static class HashRangeHoleTracker {