From 57f8c120d04893465877ea31dcefc06f4f68c822 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 2 Jul 2020 10:15:44 -0700 Subject: [PATCH] Periodc Shard Sync Metrics and Config --- .../worker/KinesisClientLibConfiguration.java | 48 +++++++++++++++++ .../lib/worker/PeriodicShardSyncManager.java | 51 +++++++++++++------ .../clientlibrary/lib/worker/Worker.java | 28 +++++----- .../worker/PeriodicShardSyncManagerTest.java | 31 ++++++----- 4 files changed, 115 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 86e7a496..d5749133 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -175,6 +175,16 @@ public class KinesisClientLibConfiguration { */ public static final ShardSyncStrategyType DEFAULT_SHARD_SYNC_STRATEGY_TYPE = ShardSyncStrategyType.SHARD_END; + /** + * Default Lease Recovery Auditor execution period for SHARD_END ShardSyncStrategyType. + */ + public static final long LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS = 2 * 60 * 1000L; + + /** + * Default Lease Recovery Auditor inconsistency confidence threshold for running full shard sync for HARD_END ShardSyncStrategyType. + */ + public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3; + /** * Default Shard prioritization strategy. */ @@ -246,6 +256,8 @@ public class KinesisClientLibConfiguration { private ShardPrioritization shardPrioritization; private long shutdownGraceMillis; private ShardSyncStrategyType shardSyncStrategyType; + private long leasesRecoveryAuditorExecutionFrequencyMillis; + private int leasesRecoveryAuditorInconsistencyConfidenceThreshold; @Getter private Optional timeoutInSeconds = Optional.empty(); @@ -515,6 +527,8 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; + this.leasesRecoveryAuditorExecutionFrequencyMillis = LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS; + this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(); this.billingMode = billingMode; @@ -625,6 +639,8 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; + this.leasesRecoveryAuditorExecutionFrequencyMillis = LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS; + this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = recordsFetcherFactory; this.shutdownGraceMillis = shutdownGraceMillis; @@ -872,6 +888,20 @@ public class KinesisClientLibConfiguration { return shardSyncStrategyType; } + /** + * @return leasesRecoveryAuditorExecutionFrequencyMillis to be used by SHARD_END ShardSyncStrategyType. + */ + public long getLeasesRecoveryAuditorExecutionFrequencyMillis() { + return leasesRecoveryAuditorExecutionFrequencyMillis; + } + + /** + * @return leasesRecoveryAuditorInconsistencyConfidenceThreshold to be used by SHARD_END ShardSyncStrategyType. + */ + public int getLeasesRecoveryAuditorInconsistencyConfidenceThreshold() { + return leasesRecoveryAuditorInconsistencyConfidenceThreshold; + } + /** * @return Max leases this Worker can handle at a time */ @@ -1249,6 +1279,24 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param leasesRecoveryAuditorExecutionFrequencyMillis Leases Recovery Auditor Execution period. + * @return {@link KinesisClientLibConfiguration} + */ + public KinesisClientLibConfiguration withLeasesRecoveryAuditorExecutionFrequencyMillis(long leasesRecoveryAuditorExecutionFrequencyMillis) { + this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis; + return this; + } + + /** + * @param leasesRecoveryAuditorInconsistencyConfidenceThreshold Leases Recovery Auditor Execution inconsistency confidence threshold. + * @return {@link KinesisClientLibConfiguration} + */ + public KinesisClientLibConfiguration withLeasesRecoveryAuditorInconsistencyConfidenceThreshold(int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { + this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold; + return this; + } + /** * * @param regionName The region name for the service diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java index 75a747e6..32fdec54 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -34,7 +34,9 @@ import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.util.CollectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -65,16 +67,12 @@ class PeriodicShardSyncManager { /** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */ private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L; - /** AUDITOR interval is used for non-PERIODIC {@link ShardSyncStrategyType} auditor processes. */ - private static final long AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; - /** Parameters for validating hash range completeness when running in auditor mode. */ @VisibleForTesting static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; @VisibleForTesting static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); - @VisibleForTesting - static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; + static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager"; private final HashRangeHoleTracker hashRangeHoleTracker = new HashRangeHoleTracker(); private final String workerId; @@ -86,6 +84,9 @@ class PeriodicShardSyncManager { private final boolean isAuditorMode; private final long periodicShardSyncIntervalMillis; private boolean isRunning; + private final IMetricsFactory metricsFactory; + private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold; + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, @@ -93,9 +94,12 @@ class PeriodicShardSyncManager { IMetricsFactory metricsFactory, ILeaseManager leaseManager, IKinesisProxy kinesisProxy, - boolean isAuditorMode) { + boolean isAuditorMode, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, - leaseManager, kinesisProxy, isAuditorMode); + leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis, + leasesRecoveryAuditorInconsistencyConfidenceThreshold); } PeriodicShardSyncManager(String workerId, @@ -105,7 +109,9 @@ class PeriodicShardSyncManager { IMetricsFactory metricsFactory, ILeaseManager leaseManager, IKinesisProxy kinesisProxy, - boolean isAuditorMode) { + boolean isAuditorMode, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); @@ -115,11 +121,13 @@ class PeriodicShardSyncManager { this.shardSyncThreadPool = shardSyncThreadPool; this.leaseManager = leaseManager; this.kinesisProxy = kinesisProxy; + this.metricsFactory = metricsFactory; this.isAuditorMode = isAuditorMode; + this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold; if (isAuditorMode) { Validate.notNull(this.leaseManager, "LeaseManager is required for non-PERIODIC shard sync strategies."); Validate.notNull(this.kinesisProxy, "KinesisProxy is required for non-PERIODIC shard sync strategies."); - this.periodicShardSyncIntervalMillis = AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS; + this.periodicShardSyncIntervalMillis = leasesRecoveryAuditorExecutionFrequencyMillis; } else { this.periodicShardSyncIntervalMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS; } @@ -166,8 +174,14 @@ class PeriodicShardSyncManager { if (leaderDecider.isLeader(workerId)) { LOG.debug("WorkerId " + workerId + " is a leader, running the shard sync task"); + MetricsHelper.startScope(metricsFactory, PERIODIC_SHARD_SYNC_MANAGER); + boolean isRunSuccess = false; + final long runStartMillis = System.currentTimeMillis(); + try { final ShardSyncResponse shardSyncResponse = checkForShardSync(); + MetricsHelper.addSuccessAndLatency(runStartMillis, shardSyncResponse.shouldDoShardSync(), MetricsLevel.SUMMARY); + MetricsHelper.addSuccessAndLatency(runStartMillis, shardSyncResponse.isHoleDetected(), MetricsLevel.SUMMARY); if (shardSyncResponse.shouldDoShardSync()) { LOG.info("Periodic shard syncer initiating shard sync due to the reason - " + shardSyncResponse.reasonForDecision()); @@ -175,8 +189,12 @@ class PeriodicShardSyncManager { } else { LOG.info("Skipping shard sync due to the reason - " + shardSyncResponse.reasonForDecision()); } + isRunSuccess = true; } catch (Exception e) { LOG.error("Caught exception while running periodic shard syncer.", e); + } finally { + MetricsHelper.addSuccessAndLatency(runStartMillis, isRunSuccess, MetricsLevel.SUMMARY); + MetricsHelper.endScope(); } } else { LOG.debug("WorkerId " + workerId + " is not a leader, not running the shard sync task"); @@ -189,7 +207,7 @@ class PeriodicShardSyncManager { if (!isAuditorMode) { // If we are running with PERIODIC shard sync strategy, we should sync every time. - return new ShardSyncResponse(true, "Syncing every time with PERIODIC shard sync strategy."); + return new ShardSyncResponse(true, false, "Syncing every time with PERIODIC shard sync strategy."); } // Get current leases from DynamoDB. @@ -198,7 +216,7 @@ class PeriodicShardSyncManager { if (CollectionUtils.isNullOrEmpty(currentLeases)) { // If the current leases are null or empty, then we need to initiate a shard sync. LOG.info("No leases found. Will trigger a shard sync."); - return new ShardSyncResponse(true, "No leases found."); + return new ShardSyncResponse(true, false, "No leases found."); } // Check if there are any holes in the hash range covered by current leases. Return the first hole if present. @@ -210,13 +228,13 @@ class PeriodicShardSyncManager { final boolean hasHoleWithHighConfidence = hashRangeHoleTracker.hashHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); - return new ShardSyncResponse(hasHoleWithHighConfidence, + return new ShardSyncResponse(hasHoleWithHighConfidence, true, "Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " + - "Will initiate shard sync after reaching threshold: " + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); + "Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold); } else { // If hole is not present, clear any previous hole tracking and return false. hashRangeHoleTracker.reset(); - return new ShardSyncResponse(false, "Hash range is complete."); + return new ShardSyncResponse(false, false, "Hash range is complete."); } } @@ -331,6 +349,7 @@ class PeriodicShardSyncManager { @VisibleForTesting static class ShardSyncResponse { private final boolean shouldDoShardSync; + private final boolean isHoleDetected; private final String reasonForDecision; } @@ -350,7 +369,7 @@ class PeriodicShardSyncManager { } } - private static class HashRangeHoleTracker { + private class HashRangeHoleTracker { private HashRangeHole hashRangeHole; @Getter private Integer numConsecutiveHoles; @@ -363,7 +382,7 @@ class PeriodicShardSyncManager { this.numConsecutiveHoles = 1; } - return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; + return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold; } public void reset() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 9cdb71b5..8373fa8b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -1265,19 +1265,21 @@ public class Worker implements Runnable { } return new PeriodicShardSyncManager(config.getWorkerIdentifier(), - leaderDecider, - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - config.getInitialPositionInStreamExtended(), - config.shouldCleanupLeasesUponShardCompletion(), - config.shouldIgnoreUnexpectedChildShards(), - SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, - shardSyncer, - null), - metricsFactory, - leaseCoordinator.getLeaseManager(), - streamConfig.getStreamProxy(), - isAuditorMode); + leaderDecider, + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + config.getInitialPositionInStreamExtended(), + config.shouldCleanupLeasesUponShardCompletion(), + config.shouldIgnoreUnexpectedChildShards(), + SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, + shardSyncer, + null), + metricsFactory, + leaseCoordinator.getLeaseManager(), + streamConfig.getStreamProxy(), + isAuditorMode, + config.getLeasesRecoveryAuditorExecutionFrequencyMillis(), + config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold()); } /** diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java index fa628b37..779ba92f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java @@ -39,7 +39,6 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY; import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize; @@ -49,6 +48,8 @@ import static org.mockito.Mockito.when; public class PeriodicShardSyncManagerTest { private static final String WORKER_ID = "workerId"; + public static final long LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS = 2 * 60 * 1000L; + public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3; /** Manager for PERIODIC shard sync strategy */ private PeriodicShardSyncManager periodicShardSyncManager; @@ -70,9 +71,11 @@ public class PeriodicShardSyncManagerTest { @Before public void setup() { periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, - metricsFactory, leaseManager, kinesisProxy, false); + metricsFactory, leaseManager, kinesisProxy, false, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, + LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, - metricsFactory, leaseManager, kinesisProxy, true); + metricsFactory, leaseManager, kinesisProxy, true, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, + LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); } @Test @@ -179,7 +182,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); when(leaseManager.listLeases()).thenReturn(leases); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -201,7 +204,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); when(leaseManager.listLeases()).thenReturn(leases); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -229,7 +232,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); when(leaseManager.listLeases()).thenReturn(leases); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -253,7 +256,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); when(leaseManager.listLeases()).thenReturn(leases); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -277,7 +280,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); when(leaseManager.listLeases()).thenReturn(leases1); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -297,7 +300,7 @@ public class PeriodicShardSyncManagerTest { // Resetting the holes when(leaseManager.listLeases()).thenReturn(leases2); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -321,7 +324,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); when(leaseManager.listLeases()).thenReturn(leases1); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -341,13 +344,13 @@ public class PeriodicShardSyncManagerTest { // Resetting the holes when(leaseManager.listLeases()).thenReturn(leases2); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } // Resetting the holes again when(leaseManager.listLeases()).thenReturn(leases1); - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -392,7 +395,7 @@ public class PeriodicShardSyncManagerTest { when(leaseManager.listLeases()).thenReturn(leases); // Assert that SHARD_END shard sync should never trigger, but PERIODIC shard sync should always trigger - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); } @@ -442,7 +445,7 @@ public class PeriodicShardSyncManagerTest { when(leaseManager.listLeases()).thenReturn(leases); // Assert that shard sync should trigger after breaching threshold - for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) { + for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) { Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync()); Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync()); }