Periodic Shard Sync Auditor - Metrics and Customer auditor configs

This commit is contained in:
Ashwin Giridharan 2020-06-12 18:12:02 -07:00
parent 1aeb8ed9a1
commit 2530481dba
4 changed files with 82 additions and 27 deletions

View file

@ -23,6 +23,7 @@ import lombok.Value;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.HashKeyRangeForLease;
@ -38,6 +39,11 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.NullMetricsScope;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigInteger; import java.math.BigInteger;
@ -67,13 +73,11 @@ import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRan
@Slf4j @Slf4j
class PeriodicShardSyncManager { class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L; private static final long INITIAL_DELAY = 60 * 1000L;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
@VisibleForTesting @VisibleForTesting
static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
@VisibleForTesting @VisibleForTesting
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
@VisibleForTesting static final String PERODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager";
static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>(); private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();
private final String workerId; private final String workerId;
@ -83,19 +87,29 @@ class PeriodicShardSyncManager {
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider; private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final ScheduledExecutorService shardSyncThreadPool; private final ScheduledExecutorService shardSyncThreadPool;
private final boolean isMultiStreamingMode; private final boolean isMultiStreamingMode;
private final MetricsFactory metricsFactory;
private final long leasesRecoveryAuditorExecutionFrequencyMillis;
private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
private boolean isRunning; private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode) { Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode,
MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider,
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode); Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory,
leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold);
} }
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider,
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) { ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode,
MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId; this.workerId = workerId;
@ -105,6 +119,9 @@ class PeriodicShardSyncManager {
this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider; this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider;
this.shardSyncThreadPool = shardSyncThreadPool; this.shardSyncThreadPool = shardSyncThreadPool;
this.isMultiStreamingMode = isMultiStreamingMode; this.isMultiStreamingMode = isMultiStreamingMode;
this.metricsFactory = metricsFactory;
this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis;
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold;
} }
public synchronized TaskResult start() { public synchronized TaskResult start() {
@ -116,7 +133,7 @@ class PeriodicShardSyncManager {
log.error("Error during runShardSync.", t); log.error("Error during runShardSync.", t);
} }
}; };
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, leasesRecoveryAuditorExecutionFrequencyMillis,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
isRunning = true; isRunning = true;
@ -157,6 +174,13 @@ class PeriodicShardSyncManager {
private void runShardSync() { private void runShardSync() {
if (leaderDecider.isLeader(workerId)) { if (leaderDecider.isLeader(workerId)) {
log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId)); log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId));
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PERODIC_SHARD_SYNC_MANAGER);
int numStreamsWithPartialLeases = 0;
int numStreamsToSync = 0;
boolean isRunSuccess = false;
final long runStartMillis = System.currentTimeMillis();
try { try {
// Construct the stream to leases map to be used in the lease sync // Construct the stream to leases map to be used in the lease sync
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap( final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
@ -166,6 +190,10 @@ class PeriodicShardSyncManager {
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) { for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey())); streamToLeasesMap.get(streamConfigEntry.getKey()));
numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0;
numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0;
if (shardSyncResponse.shouldDoShardSync()) { if (shardSyncResponse.shouldDoShardSync()) {
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ", log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ",
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
@ -181,8 +209,14 @@ class PeriodicShardSyncManager {
shardSyncResponse.reasonForDecision()); shardSyncResponse.reasonForDecision());
} }
} }
isRunSuccess = true;
} catch (Exception e) { } catch (Exception e) {
log.error("Caught exception while running periodic shard syncer.", e); log.error("Caught exception while running periodic shard syncer.", e);
} finally {
scope.addData("NumStreamsWithPartialLeases", numStreamsWithPartialLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("NumStreamsToSync", numStreamsToSync, StandardUnit.COUNT, MetricsLevel.SUMMARY);
MetricsUtil.addSuccessAndLatency(scope, isRunSuccess, runStartMillis, MetricsLevel.SUMMARY);
scope.end();
} }
} else { } else {
log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId); log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId);
@ -214,7 +248,7 @@ class PeriodicShardSyncManager {
if (CollectionUtils.isNullOrEmpty(leases)) { if (CollectionUtils.isNullOrEmpty(leases)) {
// If the leases is null or empty then we need to do shard sync // If the leases is null or empty then we need to do shard sync
log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier); log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier);
return new ShardSyncResponse(true, "No leases found for " + streamIdentifier); return new ShardSyncResponse(true, false,"No leases found for " + streamIdentifier);
} }
// Check if there are any holes in the leases and return the first hole if present. // Check if there are any holes in the leases and return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases);
@ -227,15 +261,15 @@ class PeriodicShardSyncManager {
.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()); .computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker());
final boolean hasHoleWithHighConfidence = hashRangeHoleTracker final boolean hasHoleWithHighConfidence = hashRangeHoleTracker
.hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
return new ShardSyncResponse(hasHoleWithHighConfidence, return new ShardSyncResponse(hasHoleWithHighConfidence, true,
"Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles()
+ " times. Shard sync will be initiated when threshold reaches " + " times. Shard sync will be initiated when threshold reaches "
+ CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); + leasesRecoveryAuditorInconsistencyConfidenceThreshold);
} else { } else {
// If hole is not present, clear any previous tracking for this stream and return false; // If hole is not present, clear any previous tracking for this stream and return false;
hashRangeHoleTrackerMap.remove(streamIdentifier); hashRangeHoleTrackerMap.remove(streamIdentifier);
return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier); return new ShardSyncResponse(false, false, "Hash Ranges are complete for " + streamIdentifier);
} }
} }
@ -244,6 +278,7 @@ class PeriodicShardSyncManager {
@VisibleForTesting @VisibleForTesting
static class ShardSyncResponse { static class ShardSyncResponse {
private final boolean shouldDoShardSync; private final boolean shouldDoShardSync;
private final boolean isHoleDetected;
private final String reasonForDecision; private final String reasonForDecision;
} }
@ -365,7 +400,7 @@ class PeriodicShardSyncManager {
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
} }
private static class HashRangeHoleTracker { private class HashRangeHoleTracker {
private HashRangeHole hashRangeHole; private HashRangeHole hashRangeHole;
@Getter @Getter
private Integer numConsecutiveHoles; private Integer numConsecutiveHoles;
@ -377,7 +412,7 @@ class PeriodicShardSyncManager {
this.hashRangeHole = hashRangeHole; this.hashRangeHole = hashRangeHole;
this.numConsecutiveHoles = 1; this.numConsecutiveHoles = 1;
} }
return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold;
} }
} }

View file

@ -290,7 +290,9 @@ public class Scheduler implements Runnable {
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, isMultiStreamMode); shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory,
leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(),
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode) this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCleanupManager(metricsFactory); .createLeaseCleanupManager(metricsFactory);
} }

View file

@ -52,6 +52,9 @@ public class LeaseManagementConfig {
public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis(); public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis(); public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder() public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder()
.leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS) .leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS)
@ -195,6 +198,20 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PROVISIONED; private BillingMode billingMode = BillingMode.PROVISIONED;
/**
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
* If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on
* {@link #leasesRecoveryAuditorInconsistencyConfidenceThreshold}
*/
private long leasesRecoveryAuditorExecutionFrequencyMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
/**
* Confidence threshold for the periodic auditor job to determine if leases for a stream in the lease table
* is inconsistent. If the auditor finds same set of inconsistencies consecutively for a stream for this many times,
* then it would trigger a shard sync.
*/
private int leasesRecoveryAuditorInconsistencyConfidenceThreshold = DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY;
/** /**
* The initial position for getting records from Kinesis streams. * The initial position for getting records from Kinesis streams.
* *

View file

@ -34,6 +34,7 @@ import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.math.BigInteger; import java.math.BigInteger;
@ -49,9 +50,9 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize; import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY; import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY;
import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY; import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY;
import static software.amazon.kinesis.leases.LeaseManagementConfig.DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@ -72,7 +73,7 @@ public class PeriodicShardSyncManagerTest {
public void setup() { public void setup() {
streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456"); streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456");
periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, true); shardSyncTaskManagerProvider, true, new NullMetricsFactory(), 2 * 60 * 1000, 3);
} }
@Test @Test
@ -173,7 +174,7 @@ public class PeriodicShardSyncManagerTest {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
} }
@ -191,7 +192,7 @@ public class PeriodicShardSyncManagerTest {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@ -214,7 +215,7 @@ public class PeriodicShardSyncManagerTest {
} }
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@ -233,7 +234,7 @@ public class PeriodicShardSyncManagerTest {
lease.checkpoint(ExtendedSequenceNumber.SHARD_END); lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@ -252,7 +253,7 @@ public class PeriodicShardSyncManagerTest {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{ List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize(MIN_HASH_KEY.toString(), "1"));
@ -267,7 +268,7 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// Resetting the holes // Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync());
} }
@ -286,7 +287,7 @@ public class PeriodicShardSyncManagerTest {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{ List<Lease> multiStreamLeases2 = new ArrayList<HashKeyRangeForLease>() {{
add(deserialize(MIN_HASH_KEY.toString(), "1")); add(deserialize(MIN_HASH_KEY.toString(), "1"));
@ -301,10 +302,10 @@ public class PeriodicShardSyncManagerTest {
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// Resetting the holes // Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()));
// Resetting the holes // Resetting the holes
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
} }
@ -347,7 +348,7 @@ public class PeriodicShardSyncManagerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// Assert that shard sync should never trigger // Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());
@ -395,7 +396,7 @@ public class PeriodicShardSyncManagerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// Assert that shard sync should never trigger // Assert that shard sync should never trigger
IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert
.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()));
Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync());