diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java index e79c2b93..d0d332d9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java @@ -28,7 +28,7 @@ public class DeletedStreamListProvider { /** * Method returns and empties the current set of streams - * @return list of deleted Streams + * @return set of deleted Streams */ public Set purgeAllDeletedStream() { final Set response = new HashSet<>(deletedStreams); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 3d04a79e..5b951029 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -217,7 +217,6 @@ public class Scheduler implements Runnable { @NonNull final ProcessorConfig processorConfig, @NonNull final RetrievalConfig retrievalConfig, @NonNull final DiagnosticEventFactory diagnosticEventFactory) { - log.info("Scheduler invoked for version 2.4.6, V1"); this.checkpointConfig = checkpointConfig; this.coordinatorConfig = coordinatorConfig; this.leaseManagementConfig = leaseManagementConfig; @@ -563,7 +562,12 @@ public class Scheduler implements Runnable { // These are the streams which are deleted in Kinesis and we encounter resource not found during // shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will // not have any data. - final Set deletedStreamSet = this.deletedStreamListProvider.purgeAllDeletedStream(); + // Filter streams based on newStreamConfigMap so that we don't override input to KCL in any case. + final Set deletedStreamSet = this.deletedStreamListProvider + .purgeAllDeletedStream() + .stream() + .filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier)) + .collect(Collectors.toSet()); if (deletedStreamSet.size() > 0) { log.info("Stale streams to delete: {}", deletedStreamSet); staleStreamIdsToBeDeleted.addAll(deletedStreamSet); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 946cbb7c..6bf2ff39 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -505,15 +505,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { */ @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { - return new ShardSyncTaskManager(this.createShardDetector(streamConfig), - this.createLeaseRefresher(), - streamConfig.initialPositionInStreamExtended(), - cleanupLeasesUponShardCompletion, - ignoreUnexpectedChildShards, - shardSyncIntervalMillis, - executorService, - new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()), - metricsFactory); + return createShardSyncTaskManager(metricsFactory, streamConfig, null); } /** @@ -521,6 +513,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * * @param metricsFactory - factory to get metrics object * @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created + * @param deletedStreamListProvider - store for capturing the streams which are deleted in kinesis * @return ShardSyncTaskManager */ @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index c8e3a579..e0ad8426 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -783,37 +783,73 @@ public class SchedulerTest { List streamConfigList1 = createDummyStreamConfigList(1,6); List streamConfigList2 = createDummyStreamConfigList(1,4); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + + prepareForStaleDeletedStreamCleanupTests(); + + // when KCL starts it starts with tracking 5 stream + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(0, scheduler.staleStreamDeletionMap().size()); + + // 2 Streams are no longer needed to be consumed + Set syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases(); + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(createDummyStreamConfigList(4, 6).stream() + .map(StreamConfig::streamIdentifier) + .collect(Collectors.toSet()), scheduler.staleStreamDeletionMap() + .keySet()); + assertEquals(0, syncedStreams1.size()); + + StreamConfig deletedStreamConfig = createDummyStreamConfig(5); + // One stream is deleted from Kinesis side + scheduler.deletedStreamListProvider().add(deletedStreamConfig.streamIdentifier()); + + Set syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases(); + + Set expectedCurrentStreamConfigs = Sets.newHashSet(streamConfigList1); + expectedCurrentStreamConfigs.remove(deletedStreamConfig); + + //assert kinesis deleted stream is cleaned up from KCL in memory state. + assertEquals(expectedCurrentStreamConfigs, Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(Sets.newHashSet(createDummyStreamConfig(4).streamIdentifier()), + Sets.newHashSet(scheduler.staleStreamDeletionMap().keySet())); + assertEquals(1, syncedStreams2.size()); + assertEquals(0, scheduler.deletedStreamListProvider().purgeAllDeletedStream().size()); + + verify(multiStreamTracker, times(3)).streamConfigList(); + + } + + private void prepareForStaleDeletedStreamCleanupTests() { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { @Override public Duration waitPeriodToDeleteFormerStreams() { return Duration.ofDays(1); } }); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - - // when KCL starts it starts with tracking 5 stream - assertEquals(5, scheduler.currentStreamConfigMap().size()); - assertEquals(0, scheduler.staleStreamDeletionMap().size()); - - // 2 Streams are no longer needed to be consumed - Set syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases(); - assertEquals(5, scheduler.currentStreamConfigMap().size()); - assertEquals(2, scheduler.staleStreamDeletionMap().size()); - - // One the stream is deleted from Kinesis side - scheduler.deletedStreamListProvider().add(createDummyStreamConfig(5).streamIdentifier()); - - Set syncedStreams3 = scheduler.checkAndSyncStreamShardsAndLeases(); - //assert kinesis deleted stream is cleaned up from KCL in memory state. - assertEquals(4, scheduler.currentStreamConfigMap().size()); - assertEquals(1, scheduler.staleStreamDeletionMap().size()); - - verify(multiStreamTracker, times(3)).streamConfigList(); - } + // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker + @Test + public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() + throws ProvisionedThroughputException, InvalidStateException, DependencyException { + List streamConfigList1 = createDummyStreamConfigList(1,6); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1); + prepareForStaleDeletedStreamCleanupTests(); + + scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); + + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + + assertEquals(0, syncedStreams.size()); + assertEquals(0, scheduler.staleStreamDeletionMap().size()); + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + //Creates list of upperBound-lowerBound no of dummy StreamConfig private List createDummyStreamConfigList(int lowerBound, int upperBound) { return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig)