diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index c84547e2..b4999bec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -23,6 +23,7 @@ import lombok.Value; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.common.HashKeyRangeForLease; @@ -38,6 +39,10 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; +import software.amazon.kinesis.metrics.MetricsUtil; import java.io.Serializable; import java.math.BigInteger; @@ -47,7 +52,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; @@ -67,13 +71,11 @@ import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRan @Slf4j class PeriodicShardSyncManager { private static final long INITIAL_DELAY = 60 * 1000L; - private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; @VisibleForTesting static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; @VisibleForTesting static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); - @VisibleForTesting - static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3; + static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager"; private Map hashRangeHoleTrackerMap = new HashMap<>(); private final String workerId; @@ -83,19 +85,29 @@ class PeriodicShardSyncManager { private final Function shardSyncTaskManagerProvider; private final ScheduledExecutorService shardSyncThreadPool; private final boolean isMultiStreamingMode; + private final MetricsFactory metricsFactory; + private final long leasesRecoveryAuditorExecutionFrequencyMillis; + private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold; private boolean isRunning; PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, Map currentStreamConfigMap, - Function shardSyncTaskManagerProvider, boolean isMultiStreamingMode) { + Function shardSyncTaskManagerProvider, boolean isMultiStreamingMode, + MetricsFactory metricsFactory, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, - Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode); + Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory, + leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold); } PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, Map currentStreamConfigMap, Function shardSyncTaskManagerProvider, - ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) { + ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode, + MetricsFactory metricsFactory, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; @@ -105,6 +117,9 @@ class PeriodicShardSyncManager { this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider; this.shardSyncThreadPool = shardSyncThreadPool; this.isMultiStreamingMode = isMultiStreamingMode; + this.metricsFactory = metricsFactory; + this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis; + this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold; } public synchronized TaskResult start() { @@ -116,7 +131,7 @@ class PeriodicShardSyncManager { log.error("Error during runShardSync.", t); } }; - shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, leasesRecoveryAuditorExecutionFrequencyMillis, TimeUnit.MILLISECONDS); isRunning = true; @@ -157,6 +172,14 @@ class PeriodicShardSyncManager { private void runShardSync() { if (leaderDecider.isLeader(workerId)) { log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId)); + + final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, + PERIODIC_SHARD_SYNC_MANAGER); + int numStreamsWithPartialLeases = 0; + int numStreamsToSync = 0; + boolean isRunSuccess = false; + final long runStartMillis = System.currentTimeMillis(); + try { // Construct the stream to leases map to be used in the lease sync final Map> streamToLeasesMap = getStreamToLeasesMap( @@ -166,6 +189,10 @@ class PeriodicShardSyncManager { for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), streamToLeasesMap.get(streamConfigEntry.getKey())); + + numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0; + numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0; + if (shardSyncResponse.shouldDoShardSync()) { log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ", streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); @@ -181,8 +208,14 @@ class PeriodicShardSyncManager { shardSyncResponse.reasonForDecision()); } } + isRunSuccess = true; } catch (Exception e) { log.error("Caught exception while running periodic shard syncer.", e); + } finally { + scope.addData("NumStreamsWithPartialLeases", numStreamsWithPartialLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("NumStreamsToSync", numStreamsToSync, StandardUnit.COUNT, MetricsLevel.SUMMARY); + MetricsUtil.addSuccessAndLatency(scope, isRunSuccess, runStartMillis, MetricsLevel.SUMMARY); + scope.end(); } } else { log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId); @@ -214,7 +247,7 @@ class PeriodicShardSyncManager { if (CollectionUtils.isNullOrEmpty(leases)) { // If the leases is null or empty then we need to do shard sync log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier); - return new ShardSyncResponse(true, "No leases found for " + streamIdentifier); + return new ShardSyncResponse(true, false, "No leases found for " + streamIdentifier); } // Check if there are any holes in the leases and return the first hole if present. Optional hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases); @@ -227,15 +260,15 @@ class PeriodicShardSyncManager { .computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker()); final boolean hasHoleWithHighConfidence = hashRangeHoleTracker .hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); - return new ShardSyncResponse(hasHoleWithHighConfidence, + return new ShardSyncResponse(hasHoleWithHighConfidence, true, "Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. Shard sync will be initiated when threshold reaches " - + CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY); + + leasesRecoveryAuditorInconsistencyConfidenceThreshold); } else { // If hole is not present, clear any previous tracking for this stream and return false; hashRangeHoleTrackerMap.remove(streamIdentifier); - return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier); + return new ShardSyncResponse(false, false, "Hash Ranges are complete for " + streamIdentifier); } } @@ -244,6 +277,7 @@ class PeriodicShardSyncManager { @VisibleForTesting static class ShardSyncResponse { private final boolean shouldDoShardSync; + private final boolean isHoleDetected; private final String reasonForDecision; } @@ -365,7 +399,7 @@ class PeriodicShardSyncManager { private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; } - private static class HashRangeHoleTracker { + private class HashRangeHoleTracker { private HashRangeHole hashRangeHole; @Getter private Integer numConsecutiveHoles; @@ -377,7 +411,7 @@ class PeriodicShardSyncManager { this.hashRangeHole = hashRangeHole; this.numConsecutiveHoles = 1; } - return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; + return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 3e74e23b..b6efc793 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -113,6 +113,7 @@ public class Scheduler implements Runnable { private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; + private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false; private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker"; private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; @@ -290,7 +291,9 @@ public class Scheduler implements Runnable { this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, - shardSyncTaskManagerProvider, isMultiStreamMode); + shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory, + leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(), + leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold()); this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode) .createLeaseCleanupManager(metricsFactory); } @@ -489,10 +492,9 @@ public class Scheduler implements Runnable { } }; - if (formerStreamsLeasesDeletionStrategy.shouldCleanupLeasesForDeletedStreams()) { + if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) { // We do lease sync for old streams, before leaving to the deletion strategy to delete leases for - // strategy detected leases. Also, for deleted streams we expect the shard sync to remove the - // leases. + // strategy detected leases. Iterator currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator(); while (currentSetOfStreamsIter.hasNext()) { StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next(); @@ -531,13 +533,13 @@ public class Scheduler implements Runnable { currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)); } else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) { - Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiers()).ifPresent( + Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent( streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier))); } - // Now let's scan the streamIdentifiers eligible for deferred deletion and delete them. + // Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them. // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and - // the streamIdentifiers are not present in the latest snapshot. + // the streamIdentifiersForLeaseCleanup are not present in the latest snapshot. final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet())); final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 82c02060..473db5bb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -52,6 +52,9 @@ public class LeaseManagementConfig { public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis(); public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis(); public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); + public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; + public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3; + public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder() .leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS) @@ -195,6 +198,20 @@ public class LeaseManagementConfig { private BillingMode billingMode = BillingMode.PROVISIONED; + /** + * Frequency (in millis) of the auditor job to scan for partial leases in the lease table. + * If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on + * {@link #leasesRecoveryAuditorInconsistencyConfidenceThreshold} + */ + private long leasesRecoveryAuditorExecutionFrequencyMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS; + + /** + * Confidence threshold for the periodic auditor job to determine if leases for a stream in the lease table + * is inconsistent. If the auditor finds same set of inconsistencies consecutively for a stream for this many times, + * then it would trigger a shard sync. + */ + private int leasesRecoveryAuditorInconsistencyConfidenceThreshold = DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY; + /** * The initial position for getting records from Kinesis streams. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java index e59266a4..232c428d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java @@ -29,7 +29,7 @@ public interface FormerStreamsLeasesDeletionStrategy { * StreamIdentifiers for which leases needs to be cleaned up in the lease table. * @return */ - List streamIdentifiers(); + List streamIdentifiersForLeaseCleanup(); /** * Duration to wait before deleting the leases for this stream. @@ -43,12 +43,6 @@ public interface FormerStreamsLeasesDeletionStrategy { */ StreamsLeasesDeletionType leaseDeletionType(); - /** - * Should the leases be cleaned up for deleted streams - * @return true if leases be cleaned up for deleted streams; false otherwise. - */ - boolean shouldCleanupLeasesForDeletedStreams(); - /** * StreamsLeasesDeletionType identifying the different lease cleanup strategies. */ @@ -64,7 +58,7 @@ public interface FormerStreamsLeasesDeletionStrategy { final class NoLeaseDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { @Override - public final List streamIdentifiers() { + public final List streamIdentifiersForLeaseCleanup() { throw new UnsupportedOperationException("StreamIdentifiers not required"); } @@ -77,37 +71,6 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION; } - - @Override - public final boolean shouldCleanupLeasesForDeletedStreams() { - return false; - } - } - - /** - * Strategy for not cleaning up leases for former streams. - */ - final class OnlyDeletedStreamsLeasesCleanupStrategy implements FormerStreamsLeasesDeletionStrategy { - - @Override - public final List streamIdentifiers() { - throw new UnsupportedOperationException("StreamIdentifiers not required"); - } - - @Override - public final Duration waitPeriodToDeleteFormerStreams() { - return Duration.ZERO; - } - - @Override - public final StreamsLeasesDeletionType leaseDeletionType() { - return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION; - } - - @Override - public final boolean shouldCleanupLeasesForDeletedStreams() { - return true; - } } /** @@ -117,7 +80,7 @@ public interface FormerStreamsLeasesDeletionStrategy { abstract class AutoDetectionAndDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { @Override - public final List streamIdentifiers() { + public final List streamIdentifiersForLeaseCleanup() { throw new UnsupportedOperationException("StreamIdentifiers not required"); } @@ -125,15 +88,10 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION; } - - @Override - public boolean shouldCleanupLeasesForDeletedStreams() { - return false; - } } /** - * Strategy to detect the streams for deletion through {@link #streamIdentifiers()} provided by customer at runtime + * Strategy to detect the streams for deletion through {@link #streamIdentifiersForLeaseCleanup()} provided by customer at runtime * and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()} */ abstract class ProvidedStreamsDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { @@ -142,11 +100,6 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION; } - - @Override - public boolean shouldCleanupLeasesForDeletedStreams() { - return false; - } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index 5796862b..35301624 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -38,9 +38,11 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; private final String defaultStreamName; - private final String defaultConsumerName; + private final String defaultConsumerArn; private final Function consumerArnCreator; + private Map implicitConsumerArnTracker = new HashMap<>(); + @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, final MetricsFactory metricsFactory) { @@ -52,15 +54,15 @@ public class FanOutRetrievalFactory implements RetrievalFactory { final StreamConfig streamConfig, final MetricsFactory metricsFactory) { final Optional streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); - final String streamName; if(streamIdentifierStr.isPresent()) { - streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName(); + final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - getOrCreateConsumerArn(streamName, streamConfig.consumerArn()), + getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()), streamIdentifierStr.get()); } else { + final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - getOrCreateConsumerArn(defaultStreamName, defaultConsumerName)); + getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn)); } } @@ -69,7 +71,8 @@ public class FanOutRetrievalFactory implements RetrievalFactory { throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info"); } - private String getOrCreateConsumerArn(String streamName, String consumerArn) { - return consumerArn != null ? consumerArn : consumerArnCreator.apply(streamName); + private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) { + return consumerArn != null ? consumerArn : implicitConsumerArnTracker + .computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName())); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index dfba2791..a2047a6b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -34,6 +34,7 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.math.BigInteger; @@ -49,9 +50,9 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static software.amazon.kinesis.common.HashKeyRangeForLease.deserialize; -import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY; import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MAX_HASH_KEY; import static software.amazon.kinesis.coordinator.PeriodicShardSyncManager.MIN_HASH_KEY; +import static software.amazon.kinesis.leases.LeaseManagementConfig.DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY; @RunWith(MockitoJUnitRunner.class) @@ -72,7 +73,7 @@ public class PeriodicShardSyncManagerTest { public void setup() { streamIdentifier = StreamIdentifier.multiStreamInstance("123:stream:456"); periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, - shardSyncTaskManagerProvider, true); + shardSyncTaskManagerProvider, true, new NullMetricsFactory(), 2 * 60 * 1000, 3); } @Test @@ -173,7 +174,7 @@ public class PeriodicShardSyncManagerTest { lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); } @@ -191,7 +192,7 @@ public class PeriodicShardSyncManagerTest { lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @@ -214,7 +215,7 @@ public class PeriodicShardSyncManagerTest { } return lease; }).collect(Collectors.toList()); - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @@ -233,7 +234,7 @@ public class PeriodicShardSyncManagerTest { lease.checkpoint(ExtendedSequenceNumber.SHARD_END); return lease; }).collect(Collectors.toList()); - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @@ -252,7 +253,7 @@ public class PeriodicShardSyncManagerTest { lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); List multiStreamLeases2 = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); @@ -267,7 +268,7 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); // Resetting the holes - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync()); } @@ -286,7 +287,7 @@ public class PeriodicShardSyncManagerTest { lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); List multiStreamLeases2 = new ArrayList() {{ add(deserialize(MIN_HASH_KEY.toString(), "1")); @@ -301,10 +302,10 @@ public class PeriodicShardSyncManagerTest { return lease; }).collect(Collectors.toList()); // Resetting the holes - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases2).shouldDoShardSync())); // Resetting the holes - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); } @@ -347,7 +348,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); // Assert that shard sync should never trigger - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); Assert.assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); @@ -395,7 +396,7 @@ public class PeriodicShardSyncManagerTest { }).collect(Collectors.toList()); // Assert that shard sync should never trigger - IntStream.range(1, CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY).forEach(i -> Assert + IntStream.range(1, DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY).forEach(i -> Assert .assertFalse(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync())); Assert.assertTrue(periodicShardSyncManager.checkForShardSync(streamIdentifier, multiStreamLeases).shouldDoShardSync()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index af58d3ab..9e15d988 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -482,7 +482,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return null; } @@ -497,7 +497,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy2() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(ArrayList::new)); @@ -555,7 +555,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return null; } @@ -575,7 +575,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy2() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(ArrayList::new)); @@ -639,7 +639,7 @@ public class SchedulerTest { public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return null; } @@ -654,7 +654,7 @@ public class SchedulerTest { public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy2() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))