diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index b6efc793..6a87db23 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -492,24 +492,6 @@ public class Scheduler implements Runnable { } }; - if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) { - // We do lease sync for old streams, before leaving to the deletion strategy to delete leases for - // strategy detected leases. - Iterator currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator(); - while (currentSetOfStreamsIter.hasNext()) { - StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next(); - if (!newStreamConfigMap.containsKey(streamIdentifier)) { - log.info("Found old/deleted stream: " + streamIdentifier - + ". Syncing shards of that stream."); - ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( - currentStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.submitShardSyncTask(); - currentSetOfStreamsIter.remove(); - streamsSynced.add(streamIdentifier); - } - } - } - if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) { // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. // It is assumed that all the workers will always have the latest and consistent snapshot of streams @@ -535,6 +517,29 @@ public class Scheduler implements Runnable { } else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) { Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent( streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier))); + } else { + // Remove the old/stale streams identified through the new and existing streams list, without + // cleaning up their leases. Disabling deprecated shard sync + lease cleanup through a flag. + Iterator currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator(); + while (currentSetOfStreamsIter.hasNext()) { + StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next(); + if (!newStreamConfigMap.containsKey(streamIdentifier)) { + if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) { + log.info( + "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams." + + streamIdentifier); + ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( + currentStreamConfigMap.get(streamIdentifier)); + shardSyncTaskManager.submitShardSyncTask(); + } else { + log.info( + "Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases," + + " as part of this workflow" + streamIdentifier); + } + currentSetOfStreamsIter.remove(); + streamsSynced.add(streamIdentifier); + } + } } // Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them. diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 9e15d988..a066ece0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -468,14 +468,14 @@ public class SchedulerTest { return Duration.ofHours(1); } }); - testMultiStreamStaleStreamsAreNotDeletedImmediately(true); + testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false); } @Test public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyNoDeletionStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy()); - testMultiStreamStaleStreamsAreNotDeletedImmediately(false); + testMultiStreamStaleStreamsAreNotDeletedImmediately(false, true); } @Test @@ -490,7 +490,7 @@ public class SchedulerTest { return Duration.ofHours(1); } }); - testMultiStreamStaleStreamsAreNotDeletedImmediately(false); + testMultiStreamStaleStreamsAreNotDeletedImmediately(false, false); } @Test @@ -507,10 +507,11 @@ public class SchedulerTest { return Duration.ofHours(1); } }); - testMultiStreamStaleStreamsAreNotDeletedImmediately(true); + testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false); } - private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion) + private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, + boolean onlyStreamsDeletionNotLeases) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -533,8 +534,9 @@ public class SchedulerTest { Set expectedPendingStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(HashSet::new)); - Assert.assertEquals(Sets.newHashSet(), syncedStreams); - Assert.assertEquals(Sets.newHashSet(streamConfigList1), + Set expectedSyncedStreams = onlyStreamsDeletionNotLeases ? expectedPendingStreams : Sets.newHashSet(); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + Assert.assertEquals(Sets.newHashSet(onlyStreamsDeletionNotLeases ? streamConfigList2 : streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams : Sets.newHashSet(), scheduler.staleStreamDeletionMap().keySet()); @@ -625,14 +627,14 @@ public class SchedulerTest { return Duration.ofHours(1); } }); - testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false); } @Test public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithNoDeletionStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy()); - testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false, true); } @Test @@ -647,7 +649,7 @@ public class SchedulerTest { return Duration.ofHours(1); } }); - testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false, false); } @Test @@ -665,10 +667,11 @@ public class SchedulerTest { return Duration.ofHours(1); } }); - testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true); + testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true, false); } - private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion) + private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, + boolean onlyStreamsNoLeasesDeletion) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -687,20 +690,39 @@ public class SchedulerTest { metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); - Set expectedSyncedStreams = IntStream.range(5, 7) - .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) - .collect(Collectors.toCollection(HashSet::new)); + Set expectedSyncedStreams; Set expectedPendingStreams = IntStream.range(1, 3) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) .collect(Collectors.toCollection(HashSet::new)); + + if(onlyStreamsNoLeasesDeletion) { + expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(HashSet::new)); + } else { + expectedSyncedStreams = IntStream.range(5, 7) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(HashSet::new)); + } + Assert.assertEquals(expectedSyncedStreams, syncedStreams); - List expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( - StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) - .collect(Collectors.toCollection(LinkedList::new)); + List expectedCurrentStreamConfigs; + if(onlyStreamsNoLeasesDeletion) { + expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + } else { + expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + } Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams: Sets.newHashSet(),