From 67d2b082fd7580e01cb302cf2d73bc3441f19809 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 26 May 2020 11:10:10 -0700 Subject: [PATCH] Fixing retry logic --- .../coordinator/PeriodicShardSyncManager.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 5ac4647c..c84547e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -47,6 +47,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; @@ -166,7 +167,7 @@ class PeriodicShardSyncManager { final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), streamToLeasesMap.get(streamConfigEntry.getKey())); if (shardSyncResponse.shouldDoShardSync()) { - log.info("Periodic shard syncer initiating shard sync for {} due to the reason - ", + log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ", streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider .apply(streamConfigEntry.getValue()); @@ -228,7 +229,7 @@ class PeriodicShardSyncManager { .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); return new ShardSyncResponse(hasHoleWithHighConfidence, "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() - + " times. Shard sync will be initiated when threshold breaches " + + " times. Shard sync will be initiated when threshold reaches " + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); } else { @@ -308,8 +309,8 @@ class PeriodicShardSyncManager { log.error("Incomplete hash range found for stream {} between {} and {}.", streamIdentifier, sortedLeasesWithHashKeyRanges.get(0), sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1)); - return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0), - sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1))); + return Optional.of(new HashRangeHole(sortedLeasesWithHashKeyRanges.get(0).hashKeyRangeForLease(), + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1).hashKeyRangeForLease())); } // Check for any holes in the sorted hashrange intervals. if (sortedLeasesWithHashKeyRanges.size() > 1) { @@ -330,8 +331,8 @@ class PeriodicShardSyncManager { if (!rangeDiff.equals(BigInteger.ONE)) { log.error("Incomplete hash range found for {} between {} and {}.", streamIdentifier, leftMostLeaseToReportInCaseOfHole, sortedLeasesWithHashKeyRanges.get(i)); - return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole, - sortedLeasesWithHashKeyRanges.get(i))); + return Optional.of(new HashRangeHole(leftMostLeaseToReportInCaseOfHole.hashKeyRangeForLease(), + sortedLeasesWithHashKeyRanges.get(i).hashKeyRangeForLease())); } leftMostLeaseToReportInCaseOfHole = sortedLeasesWithHashKeyRanges.get(i); leftLeaseHashRange = rightLeaseHashRange; @@ -352,16 +353,16 @@ class PeriodicShardSyncManager { @Value private static class HashRangeHole { HashRangeHole() { - leaseAtEndOfPossibleHole = leaseAtStartOfPossibleHole = null; + hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null; } - HashRangeHole(Lease leaseAtStartOfPossibleHole, Lease leaseAtEndOfPossibleHole) { - this.leaseAtStartOfPossibleHole = leaseAtStartOfPossibleHole; - this.leaseAtEndOfPossibleHole = leaseAtEndOfPossibleHole; + HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole, HashKeyRangeForLease hashRangeAtEndOfPossibleHole) { + this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole; + this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole; } - private final Lease leaseAtStartOfPossibleHole; - private final Lease leaseAtEndOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; } private static class HashRangeHoleTracker {