From 038524e0b154a78b0991edb4046dc966739313b5 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 24 Apr 2020 00:01:19 -0700 Subject: [PATCH] Introducing lease deletion strategy for multistreaming --- .../amazon/kinesis/common/ConfigsBuilder.java | 10 +- .../amazon/kinesis/coordinator/Scheduler.java | 178 ++++++++++-------- .../amazon/kinesis/metrics/MetricsUtil.java | 5 + .../FormerStreamsLeasesDeletionStrategy.java | 109 +++++++++++ .../kinesis/processor/MultiStreamTracker.java | 12 +- .../kinesis/coordinator/SchedulerTest.java | 177 ++++++++++++++++- 6 files changed, 399 insertions(+), 92 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java index 9595fdf9..09d28495 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java @@ -124,7 +124,10 @@ public class ConfigsBuilder { * @param workerIdentifier * @param shardRecordProcessorFactory */ - public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { + public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, + @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, + @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, + @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; @@ -144,7 +147,10 @@ public class ConfigsBuilder { * @param workerIdentifier * @param shardRecordProcessorFactory */ - public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { + public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, + @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, + @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, + @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 12cbae8f..e520fdce 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -40,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import lombok.AccessLevel; @@ -83,7 +83,11 @@ import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; +import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.processor.Checkpointer; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -92,6 +96,8 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; +import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType; + /** * */ @@ -106,6 +112,10 @@ public class Scheduler implements Runnable { private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; + private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker"; + private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; + private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; + private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count"; private SchedulerLog slog = new SchedulerLog(); @@ -143,6 +153,7 @@ public class Scheduler implements Runnable { private final boolean isMultiStreamMode; private final Map currentStreamConfigMap; private MultiStreamTracker multiStreamTracker; + private FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; @@ -212,6 +223,7 @@ public class Scheduler implements Runnable { this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map( multiStreamTracker -> { this.multiStreamTracker = multiStreamTracker; + this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy(); return multiStreamTracker.streamConfigList().stream() .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); }, @@ -457,92 +469,108 @@ public class Scheduler implements Runnable { final Set streamsSynced = new HashSet<>(); if (shouldSyncStreamsNow()) { - final Map newStreamConfigMap = new HashMap<>(); - final Duration waitPeriodToDeleteOldStreams = multiStreamTracker.waitPeriodToDeleteOldStreams(); - // Making an immutable copy - newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() - .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); + final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER); - List leases; + try { - // This is done to ensure that we clean up the stale streams lingering in the lease table. - if (!leasesSyncedOnAppInit && isMultiStreamMode) { - leases = fetchMultiStreamLeases(); - syncStreamsFromLeaseTableOnAppInit(leases); - leasesSyncedOnAppInit = true; - } + final Map newStreamConfigMap = new HashMap<>(); + final Duration waitPeriodToDeleteOldStreams = formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams(); + // Making an immutable copy + newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() + .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); - // For new streams discovered, do a shard sync and update the currentStreamConfigMap - for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { - if (!currentStreamConfigMap.containsKey(streamIdentifier)) { - log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); - ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.syncShardAndLeaseInfo(); - currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); - streamsSynced.add(streamIdentifier); - } else { - if (log.isDebugEnabled()) { - log.debug(streamIdentifier + " is already being processed - skipping shard sync."); + List leases; + + // This is done to ensure that we clean up the stale streams lingering in the lease table. + if (!leasesSyncedOnAppInit && isMultiStreamMode) { + leases = fetchMultiStreamLeases(); + syncStreamsFromLeaseTableOnAppInit(leases); + leasesSyncedOnAppInit = true; + } + + // For new streams discovered, do a shard sync and update the currentStreamConfigMap + for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); + ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier)); + shardSyncTaskManager.syncShardAndLeaseInfo(); + currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier)); + streamsSynced.add(streamIdentifier); + } else { + if (log.isDebugEnabled()) { + log.debug(streamIdentifier + " is already being processed - skipping shard sync."); + } } } - } - // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. - // It is assumed that all the workers will always have the latest and consistent snapshot of streams - // from the multiStreamTracker. - // - // The following streams transition state among two workers are NOT considered safe, where Worker 2, on - // initialization learn about D from lease table and delete the leases for D, as it is not available - // in its latest MultiStreamTracker. - // Worker 1 : A,B,C -> A,B,C,D (latest) - // Worker 2 : BOOTS_UP -> A,B,C (stale) - // - // The following streams transition state among two workers are NOT considered safe, where Worker 2 might - // end up deleting the leases for A and D and loose progress made so far. - // Worker 1 : A,B,C -> A,B,C,D (latest) - // Worker 2 : A,B,C -> B,C (stale/partial) - // - // In order to give workers with stale stream info, sufficient time to learn about the new streams - // before attempting to delete it, we will be deferring the leases deletion based on the - // defer time period. + final Consumer enqueueStreamLeaseDeletionOperation = streamIdentifier -> { + if (!newStreamConfigMap.containsKey(streamIdentifier)) { + staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now()); + } + }; - Iterator currentStreamConfigIter = currentStreamConfigMap.keySet().iterator(); - while (currentStreamConfigIter.hasNext()) { - StreamIdentifier streamIdentifier = currentStreamConfigIter.next(); - if (!newStreamConfigMap.containsKey(streamIdentifier)) { - staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now()); + if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) { + // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. + // It is assumed that all the workers will always have the latest and consistent snapshot of streams + // from the multiStreamTracker. + // + // The following streams transition state among two workers are NOT considered safe, where Worker 2, on + // initialization learn about D from lease table and delete the leases for D, as it is not available + // in its latest MultiStreamTracker. + // Worker 1 : A,B,C -> A,B,C,D (latest) + // Worker 2 : BOOTS_UP -> A,B,C (stale) + // + // The following streams transition state among two workers are NOT considered safe, where Worker 2 might + // end up deleting the leases for A and D and loose progress made so far. + // Worker 1 : A,B,C -> A,B,C,D (latest) + // Worker 2 : A,B,C -> B,C (stale/partial) + // + // In order to give workers with stale stream info, sufficient time to learn about the new streams + // before attempting to delete it, we will be deferring the leases deletion based on the + // defer time period. + + currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)); + + } else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) { + Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiers()).ifPresent( + streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier))); } + + // Now let's scan the streamIdentifiers eligible for deferred deletion and delete them. + // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and + // the streamIdentifiers are not present in the latest snapshot. + final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors + .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet())); + final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> + Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); + final Set deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted); + streamsSynced.addAll(deletedStreamsLeases); + + // Purge the active streams from stale streams list. + final Set staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true); + removeStreamsFromStaleStreamsList(staleStreamIdsToBeRevived); + + log.warn( + "Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ", + staleStreamDeletionMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams)))); + + streamSyncWatch.reset().start(); + + MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY); + MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(), + MetricsLevel.SUMMARY); + MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY); + } finally { + MetricsUtil.endScope(metricsScope); } - - // Now let's scan the streamIdentifiers eligible for deferred deletion and delete them. - // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and - // the streamIdentifiers are not present in the latest snapshot. - final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet() - .stream().collect(Collectors - .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), - Collectors.toSet())); - final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream() - .filter(streamIdentifier -> - Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() - >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); - streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted)); - - // Purge the active streams from stale streams list. - final Set staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true); - removeActiveStreamsFromStaleStreamsList(staleStreamIdsToBeRevived); - - log.warn("Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ", - staleStreamDeletionMap.entrySet().stream().collect(Collectors - .toMap(Map.Entry::getKey, entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams)))); - - streamSyncWatch.reset().start(); } return streamsSynced; } - @VisibleForTesting - boolean shouldSyncStreamsNow() { - return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); + @VisibleForTesting boolean shouldSyncStreamsNow() { + return isMultiStreamMode && + (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); } private void syncStreamsFromLeaseTableOnAppInit(List leases) { @@ -561,7 +589,7 @@ public class Scheduler implements Runnable { return (List) ((List) leaseCoordinator.leaseRefresher().listLeases()); } - private void removeActiveStreamsFromStaleStreamsList(Set streamIdentifiers) { + private void removeStreamsFromStaleStreamsList(Set streamIdentifiers) { for(StreamIdentifier streamIdentifier : streamIdentifiers) { staleStreamDeletionMap.remove(streamIdentifier); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java index a2f9d84b..20c7c244 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java @@ -94,6 +94,11 @@ public class MetricsUtil { metricsScope.addData(metricName, success ? 1 : 0, StandardUnit.COUNT, metricsLevel); } + public static void addCount(@NonNull final MetricsScope metricsScope, final String dimension, + final long count, @NonNull final MetricsLevel metricsLevel) { + metricsScope.addData(dimension, count, StandardUnit.COUNT, metricsLevel); + } + public static void endScope(@NonNull final MetricsScope metricsScope) { metricsScope.end(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java new file mode 100644 index 00000000..5c202040 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java @@ -0,0 +1,109 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.processor; + +import software.amazon.kinesis.common.StreamIdentifier; + +import java.time.Duration; +import java.util.List; + +/** + * Strategy for cleaning up the leases for former streams. + */ +public interface FormerStreamsLeasesDeletionStrategy { + + /** + * StreamIdentifiers for which leases needs to be cleaned up in the lease table. + * @return + */ + List streamIdentifiers(); + + /** + * Duration to wait before deleting the leases for this stream. + * @return + */ + Duration waitPeriodToDeleteFormerStreams(); + + /** + * Strategy type for deleting the leases of former streams + * @return + */ + StreamsLeasesDeletionType leaseDeletionType(); + + /** + * StreamsLeasesDeletionType identifying the different lease cleanup strategies. + */ + enum StreamsLeasesDeletionType { + NO_STREAMS_LEASES_DELETION, + FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION, + PROVIDED_STREAMS_DEFERRED_DELETION + } + + /** + * Strategy for not cleaning up leases for former streams. + */ + final class NoLeaseDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { + + @Override + public final List streamIdentifiers() { + throw new UnsupportedOperationException("StreamIdentifiers not required"); + } + + @Override + public final Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + + @Override + public final StreamsLeasesDeletionType leaseDeletionType() { + return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION; + } + } + + /** + * Strategy for auto detection the old of former streams based on the {@link MultiStreamTracker#streamConfigList()} + * and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()} + */ + abstract class AutoDetectionAndDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { + + @Override + public final List streamIdentifiers() { + throw new UnsupportedOperationException("StreamIdentifiers not required"); + } + + @Override + public final StreamsLeasesDeletionType leaseDeletionType() { + return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION; + } + } + + /** + * Strategy to detect the streams for deletion through {@link #streamIdentifiers()} provided by customer at runtime + * and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()} + */ + abstract class ProvidedStreamsDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { + + @Override + public final StreamsLeasesDeletionType leaseDeletionType() { + return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION; + } + } + +} + + + + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java index 785778db..1b742509 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -17,7 +17,6 @@ package software.amazon.kinesis.processor; import software.amazon.kinesis.common.StreamConfig; -import java.time.Duration; import java.util.List; /** @@ -29,15 +28,18 @@ public interface MultiStreamTracker { /** * Returns the list of stream config, to be processed by the current application. - * Note that this method will be called periodically called by the KCL to learn about the new and old streams. + * Note that the streams list CAN be changed during the application runtime. + * This method will be called periodically by the KCL to learn about the change in streams to process. * * @return List of StreamConfig */ List streamConfigList(); /** - * Duration to wait before deleting the old streams in the lease table. - * @return Wait time before deleting old streams + * Strategy to delete leases of old streams in the lease table. + * Note that the strategy CANNOT be changed during the application runtime. + * + * @return StreamsLeasesDeletionStrategy */ - Duration waitPeriodToDeleteOldStreams(); + FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 740cbbef..d4f17917 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.atMost; +import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.*; import java.time.Duration; import java.util.ArrayList; @@ -94,6 +95,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.Checkpointer; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -181,7 +183,12 @@ public class SchedulerTest { }}; when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); - when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ofHours(1L)); + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()) + .thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + }); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); @@ -442,7 +449,56 @@ public class SchedulerTest { } @Test - public final void testMultiStreamStaleStreamsAreNotDeletedImmediately() + public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyAutoDeletionStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofHours(1); + } + }); + testMultiStreamStaleStreamsAreNotDeletedImmediately(true); + } + + @Test + public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyNoDeletionStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy()); + testMultiStreamStaleStreamsAreNotDeletedImmediately(false); + } + + @Test + public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { + @Override public List streamIdentifiers() { + return null; + } + + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofHours(1); + } + }); + testMultiStreamStaleStreamsAreNotDeletedImmediately(false); + } + + @Test + public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy2() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { + @Override public List streamIdentifiers() { + return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(ArrayList::new)); + } + + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofHours(1); + } + }); + testMultiStreamStaleStreamsAreNotDeletedImmediately(true); + } + + private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -457,6 +513,7 @@ public class SchedulerTest { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); @@ -467,12 +524,59 @@ public class SchedulerTest { Assert.assertEquals(Sets.newHashSet(), syncedStreams); Assert.assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); - Assert.assertEquals(expectedPendingStreams, + Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams : Sets.newHashSet(), scheduler.staleStreamDeletionMap().keySet()); } @Test - public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod() + public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithAutoDetectionStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + }); + testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(true, null); + } + + @Test + public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { + @Override public List streamIdentifiers() { + return null; + } + + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + }); + HashSet currentStreamConfigMapOverride = IntStream.range(1, 5).mapToObj( + streamId -> new StreamConfig(StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(HashSet::new)); + testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(false, currentStreamConfigMapOverride); + } + + @Test + public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy2() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { + @Override public List streamIdentifiers() { + return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(ArrayList::new)); + } + + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + }); + testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(true, null); + } + + private final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams, Set currentStreamConfigMapOverride) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -490,20 +594,69 @@ public class SchedulerTest { scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(HashSet::new)); - Assert.assertEquals(expectedSyncedStreams, syncedStreams); - Assert.assertEquals(Sets.newHashSet(streamConfigList2), + Assert.assertEquals(expectSyncedStreams ? expectedSyncedStreams : Sets.newHashSet(), syncedStreams); + Assert.assertEquals(currentStreamConfigMapOverride == null ? Sets.newHashSet(streamConfigList2) : currentStreamConfigMapOverride, Sets.newHashSet(scheduler.currentStreamConfigMap().values())); Assert.assertEquals(Sets.newHashSet(), scheduler.staleStreamDeletionMap().keySet()); } @Test - public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately() + public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithAutoDetectionStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofHours(1); + } + }); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true); + } + + @Test + public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithNoDeletionStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy()); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false); + } + + @Test + public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { + @Override public List streamIdentifiers() { + return null; + } + + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofHours(1); + } + }); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false); + } + + @Test + public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy2() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { + @Override public List streamIdentifiers() { + return IntStream.range(1, 3) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(ArrayList::new)); + } + + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofHours(1); + } + }); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true); + } + + private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -538,7 +691,7 @@ public class SchedulerTest { .collect(Collectors.toCollection(LinkedList::new)); Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); - Assert.assertEquals(expectedPendingStreams, + Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams: Sets.newHashSet(), scheduler.staleStreamDeletionMap().keySet()); } @@ -561,7 +714,11 @@ public class SchedulerTest { scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO); + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + }); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance(