Merge pull request #72 from ychunxue/periodicShardSyncMetric
Periodic shard sync metric
This commit is contained in:
commit
b7468267f4
4 changed files with 114 additions and 44 deletions
|
|
@ -193,6 +193,16 @@ public class KinesisClientLibConfiguration {
|
|||
*/
|
||||
public static final ShardSyncStrategyType DEFAULT_SHARD_SYNC_STRATEGY_TYPE = ShardSyncStrategyType.SHARD_END;
|
||||
|
||||
/**
|
||||
* Default Lease Recovery Auditor execution period for SHARD_END ShardSyncStrategyType.
|
||||
*/
|
||||
public static final long LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS = 2 * 60 * 1000L;
|
||||
|
||||
/**
|
||||
* Default Lease Recovery Auditor inconsistency confidence threshold for running full shard sync for SHARD_END ShardSyncStrategyType.
|
||||
*/
|
||||
public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3;
|
||||
|
||||
/**
|
||||
* Default Shard prioritization strategy.
|
||||
*/
|
||||
|
|
@ -267,6 +277,8 @@ public class KinesisClientLibConfiguration {
|
|||
private long leaseCleanupIntervalMillis;
|
||||
private long completedLeaseCleanupThresholdMillis;
|
||||
private long garbageLeaseCleanupThresholdMillis;
|
||||
private long leasesRecoveryAuditorExecutionFrequencyMillis;
|
||||
private int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
|
||||
@Getter
|
||||
private Optional<Integer> timeoutInSeconds = Optional.empty();
|
||||
|
|
@ -696,6 +708,8 @@ public class KinesisClientLibConfiguration {
|
|||
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
|
||||
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
|
||||
this.leasesRecoveryAuditorExecutionFrequencyMillis = LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS;
|
||||
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD;
|
||||
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis;
|
||||
|
|
@ -703,7 +717,6 @@ public class KinesisClientLibConfiguration {
|
|||
this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis;
|
||||
this.shutdownGraceMillis = shutdownGraceMillis;
|
||||
this.billingMode = billingMode;
|
||||
|
||||
}
|
||||
|
||||
// Check if value is positive, otherwise throw an exception
|
||||
|
|
@ -971,6 +984,20 @@ public class KinesisClientLibConfiguration {
|
|||
return shardSyncStrategyType;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return leasesRecoveryAuditorExecutionFrequencyMillis to be used by SHARD_END ShardSyncStrategyType.
|
||||
*/
|
||||
public long getLeasesRecoveryAuditorExecutionFrequencyMillis() {
|
||||
return leasesRecoveryAuditorExecutionFrequencyMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return leasesRecoveryAuditorInconsistencyConfidenceThreshold to be used by SHARD_END ShardSyncStrategyType.
|
||||
*/
|
||||
public int getLeasesRecoveryAuditorInconsistencyConfidenceThreshold() {
|
||||
return leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Max leases this Worker can handle at a time
|
||||
*/
|
||||
|
|
@ -1348,6 +1375,24 @@ public class KinesisClientLibConfiguration {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param leasesRecoveryAuditorExecutionFrequencyMillis Leases Recovery Auditor Execution period.
|
||||
* @return {@link KinesisClientLibConfiguration}
|
||||
*/
|
||||
public KinesisClientLibConfiguration withLeasesRecoveryAuditorExecutionFrequencyMillis(long leasesRecoveryAuditorExecutionFrequencyMillis) {
|
||||
this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param leasesRecoveryAuditorInconsistencyConfidenceThreshold Leases Recovery Auditor Execution inconsistency confidence threshold.
|
||||
* @return {@link KinesisClientLibConfiguration}
|
||||
*/
|
||||
public KinesisClientLibConfiguration withLeasesRecoveryAuditorInconsistencyConfidenceThreshold(int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
|
||||
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param regionName The region name for the service
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.amazonaws.services.cloudwatch.model.StandardUnit;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
|
|
@ -34,7 +35,9 @@ import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease;
|
|||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
|
@ -65,16 +68,12 @@ class PeriodicShardSyncManager {
|
|||
/** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */
|
||||
private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L;
|
||||
|
||||
/** AUDITOR interval is used for non-PERIODIC {@link ShardSyncStrategyType} auditor processes. */
|
||||
private static final long AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
|
||||
|
||||
/** Parameters for validating hash range completeness when running in auditor mode. */
|
||||
@VisibleForTesting
|
||||
static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
|
||||
@VisibleForTesting
|
||||
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
|
||||
@VisibleForTesting
|
||||
static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
|
||||
static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager";
|
||||
private final HashRangeHoleTracker hashRangeHoleTracker = new HashRangeHoleTracker();
|
||||
|
||||
private final String workerId;
|
||||
|
|
@ -86,6 +85,9 @@ class PeriodicShardSyncManager {
|
|||
private final boolean isAuditorMode;
|
||||
private final long periodicShardSyncIntervalMillis;
|
||||
private boolean isRunning;
|
||||
private final IMetricsFactory metricsFactory;
|
||||
private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
|
||||
|
||||
PeriodicShardSyncManager(String workerId,
|
||||
LeaderDecider leaderDecider,
|
||||
|
|
@ -93,9 +95,12 @@ class PeriodicShardSyncManager {
|
|||
IMetricsFactory metricsFactory,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
IKinesisProxy kinesisProxy,
|
||||
boolean isAuditorMode) {
|
||||
boolean isAuditorMode,
|
||||
long leasesRecoveryAuditorExecutionFrequencyMillis,
|
||||
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
|
||||
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory,
|
||||
leaseManager, kinesisProxy, isAuditorMode);
|
||||
leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis,
|
||||
leasesRecoveryAuditorInconsistencyConfidenceThreshold);
|
||||
}
|
||||
|
||||
PeriodicShardSyncManager(String workerId,
|
||||
|
|
@ -105,7 +110,9 @@ class PeriodicShardSyncManager {
|
|||
IMetricsFactory metricsFactory,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
IKinesisProxy kinesisProxy,
|
||||
boolean isAuditorMode) {
|
||||
boolean isAuditorMode,
|
||||
long leasesRecoveryAuditorExecutionFrequencyMillis,
|
||||
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
|
||||
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
|
||||
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
|
||||
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
|
||||
|
|
@ -115,11 +122,13 @@ class PeriodicShardSyncManager {
|
|||
this.shardSyncThreadPool = shardSyncThreadPool;
|
||||
this.leaseManager = leaseManager;
|
||||
this.kinesisProxy = kinesisProxy;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.isAuditorMode = isAuditorMode;
|
||||
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
if (isAuditorMode) {
|
||||
Validate.notNull(this.leaseManager, "LeaseManager is required for non-PERIODIC shard sync strategies.");
|
||||
Validate.notNull(this.kinesisProxy, "KinesisProxy is required for non-PERIODIC shard sync strategies.");
|
||||
this.periodicShardSyncIntervalMillis = AUDITOR_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
|
||||
this.periodicShardSyncIntervalMillis = leasesRecoveryAuditorExecutionFrequencyMillis;
|
||||
} else {
|
||||
this.periodicShardSyncIntervalMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
|
||||
}
|
||||
|
|
@ -166,8 +175,14 @@ class PeriodicShardSyncManager {
|
|||
if (leaderDecider.isLeader(workerId)) {
|
||||
LOG.debug("WorkerId " + workerId + " is a leader, running the shard sync task");
|
||||
|
||||
MetricsHelper.startScope(metricsFactory, PERIODIC_SHARD_SYNC_MANAGER);
|
||||
boolean isRunSuccess = false;
|
||||
final long runStartMillis = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
final ShardSyncResponse shardSyncResponse = checkForShardSync();
|
||||
MetricsHelper.getMetricsScope().addData("InitiatingShardSync", shardSyncResponse.shouldDoShardSync() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
MetricsHelper.getMetricsScope().addData("DetectedIncompleteLease", shardSyncResponse.isHoleDetected() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY);
|
||||
if (shardSyncResponse.shouldDoShardSync()) {
|
||||
LOG.info("Periodic shard syncer initiating shard sync due to the reason - " +
|
||||
shardSyncResponse.reasonForDecision());
|
||||
|
|
@ -175,8 +190,12 @@ class PeriodicShardSyncManager {
|
|||
} else {
|
||||
LOG.info("Skipping shard sync due to the reason - " + shardSyncResponse.reasonForDecision());
|
||||
}
|
||||
isRunSuccess = true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Caught exception while running periodic shard syncer.", e);
|
||||
} finally {
|
||||
MetricsHelper.addSuccessAndLatency(runStartMillis, isRunSuccess, MetricsLevel.SUMMARY);
|
||||
MetricsHelper.endScope();
|
||||
}
|
||||
} else {
|
||||
LOG.debug("WorkerId " + workerId + " is not a leader, not running the shard sync task");
|
||||
|
|
@ -189,7 +208,7 @@ class PeriodicShardSyncManager {
|
|||
|
||||
if (!isAuditorMode) {
|
||||
// If we are running with PERIODIC shard sync strategy, we should sync every time.
|
||||
return new ShardSyncResponse(true, "Syncing every time with PERIODIC shard sync strategy.");
|
||||
return new ShardSyncResponse(true, false, "Syncing every time with PERIODIC shard sync strategy.");
|
||||
}
|
||||
|
||||
// Get current leases from DynamoDB.
|
||||
|
|
@ -198,7 +217,7 @@ class PeriodicShardSyncManager {
|
|||
if (CollectionUtils.isNullOrEmpty(currentLeases)) {
|
||||
// If the current leases are null or empty, then we need to initiate a shard sync.
|
||||
LOG.info("No leases found. Will trigger a shard sync.");
|
||||
return new ShardSyncResponse(true, "No leases found.");
|
||||
return new ShardSyncResponse(true, false, "No leases found.");
|
||||
}
|
||||
|
||||
// Check if there are any holes in the hash range covered by current leases. Return the first hole if present.
|
||||
|
|
@ -210,13 +229,13 @@ class PeriodicShardSyncManager {
|
|||
final boolean hasHoleWithHighConfidence =
|
||||
hashRangeHoleTracker.hashHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
|
||||
|
||||
return new ShardSyncResponse(hasHoleWithHighConfidence,
|
||||
return new ShardSyncResponse(hasHoleWithHighConfidence, true,
|
||||
"Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " +
|
||||
"Will initiate shard sync after reaching threshold: " + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY);
|
||||
"Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold);
|
||||
} else {
|
||||
// If hole is not present, clear any previous hole tracking and return false.
|
||||
hashRangeHoleTracker.reset();
|
||||
return new ShardSyncResponse(false, "Hash range is complete.");
|
||||
return new ShardSyncResponse(false, false, "Hash range is complete.");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -331,6 +350,7 @@ class PeriodicShardSyncManager {
|
|||
@VisibleForTesting
|
||||
static class ShardSyncResponse {
|
||||
private final boolean shouldDoShardSync;
|
||||
private final boolean isHoleDetected;
|
||||
private final String reasonForDecision;
|
||||
}
|
||||
|
||||
|
|
@ -350,7 +370,7 @@ class PeriodicShardSyncManager {
|
|||
}
|
||||
}
|
||||
|
||||
private static class HashRangeHoleTracker {
|
||||
private class HashRangeHoleTracker {
|
||||
private HashRangeHole hashRangeHole;
|
||||
@Getter
|
||||
private Integer numConsecutiveHoles;
|
||||
|
|
@ -363,7 +383,7 @@ class PeriodicShardSyncManager {
|
|||
this.numConsecutiveHoles = 1;
|
||||
}
|
||||
|
||||
return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
|
||||
return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
|
|
|
|||
|
|
@ -1281,19 +1281,21 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
return new PeriodicShardSyncManager(config.getWorkerIdentifier(),
|
||||
leaderDecider,
|
||||
new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
config.getInitialPositionInStreamExtended(),
|
||||
config.shouldCleanupLeasesUponShardCompletion(),
|
||||
config.shouldIgnoreUnexpectedChildShards(),
|
||||
SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
|
||||
shardSyncer,
|
||||
null),
|
||||
metricsFactory,
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
streamConfig.getStreamProxy(),
|
||||
isAuditorMode);
|
||||
leaderDecider,
|
||||
new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
config.getInitialPositionInStreamExtended(),
|
||||
config.shouldCleanupLeasesUponShardCompletion(),
|
||||
config.shouldIgnoreUnexpectedChildShards(),
|
||||
SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
|
||||
shardSyncer,
|
||||
null),
|
||||
metricsFactory,
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
streamConfig.getStreamProxy(),
|
||||
isAuditorMode,
|
||||
config.getLeasesRecoveryAuditorExecutionFrequencyMillis(),
|
||||
config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -39,7 +39,6 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
|
||||
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY;
|
||||
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY;
|
||||
import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize;
|
||||
|
|
@ -49,6 +48,8 @@ import static org.mockito.Mockito.when;
|
|||
public class PeriodicShardSyncManagerTest {
|
||||
|
||||
private static final String WORKER_ID = "workerId";
|
||||
public static final long LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS = 2 * 60 * 1000L;
|
||||
public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3;
|
||||
|
||||
/** Manager for PERIODIC shard sync strategy */
|
||||
private PeriodicShardSyncManager periodicShardSyncManager;
|
||||
|
|
@ -70,9 +71,11 @@ public class PeriodicShardSyncManagerTest {
|
|||
@Before
|
||||
public void setup() {
|
||||
periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask,
|
||||
metricsFactory, leaseManager, kinesisProxy, false);
|
||||
metricsFactory, leaseManager, kinesisProxy, false, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS,
|
||||
LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD);
|
||||
auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask,
|
||||
metricsFactory, leaseManager, kinesisProxy, true);
|
||||
metricsFactory, leaseManager, kinesisProxy, true, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS,
|
||||
LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -179,7 +182,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
when(leaseManager.listLeases()).thenReturn(leases);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -201,7 +204,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
when(leaseManager.listLeases()).thenReturn(leases);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -229,7 +232,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
when(leaseManager.listLeases()).thenReturn(leases);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -253,7 +256,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
when(leaseManager.listLeases()).thenReturn(leases);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -277,7 +280,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
when(leaseManager.listLeases()).thenReturn(leases1);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -297,7 +300,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
|
||||
// Resetting the holes
|
||||
when(leaseManager.listLeases()).thenReturn(leases2);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -321,7 +324,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
}).collect(Collectors.toList());
|
||||
|
||||
when(leaseManager.listLeases()).thenReturn(leases1);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -341,13 +344,13 @@ public class PeriodicShardSyncManagerTest {
|
|||
|
||||
// Resetting the holes
|
||||
when(leaseManager.listLeases()).thenReturn(leases2);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
// Resetting the holes again
|
||||
when(leaseManager.listLeases()).thenReturn(leases1);
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -392,7 +395,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
when(leaseManager.listLeases()).thenReturn(leases);
|
||||
|
||||
// Assert that SHARD_END shard sync should never trigger, but PERIODIC shard sync should always trigger
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
@ -442,7 +445,7 @@ public class PeriodicShardSyncManagerTest {
|
|||
when(leaseManager.listLeases()).thenReturn(leases);
|
||||
|
||||
// Assert that shard sync should trigger after breaching threshold
|
||||
for (int i = 1; i < CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; i++) {
|
||||
for (int i = 1; i < LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD; i++) {
|
||||
Assert.assertTrue(periodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
Assert.assertFalse(auditorPeriodicShardSyncManager.checkForShardSync().shouldDoShardSync());
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue