diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e36f0582..b6efc793 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -113,6 +113,7 @@ public class Scheduler implements Runnable { private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; + private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false; private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker"; private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; @@ -491,10 +492,9 @@ public class Scheduler implements Runnable { } }; - if (formerStreamsLeasesDeletionStrategy.shouldCleanupLeasesForDeletedStreams()) { + if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) { // We do lease sync for old streams, before leaving to the deletion strategy to delete leases for - // strategy detected leases. Also, for deleted streams we expect the shard sync to remove the - // leases. + // strategy detected leases. Iterator currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator(); while (currentSetOfStreamsIter.hasNext()) { StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next(); @@ -533,13 +533,13 @@ public class Scheduler implements Runnable { currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)); } else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) { - Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiers()).ifPresent( + Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent( streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier))); } - // Now let's scan the streamIdentifiers eligible for deferred deletion and delete them. + // Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them. // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and - // the streamIdentifiers are not present in the latest snapshot. + // the streamIdentifiersForLeaseCleanup are not present in the latest snapshot. final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet())); final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java index e59266a4..232c428d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java @@ -29,7 +29,7 @@ public interface FormerStreamsLeasesDeletionStrategy { * StreamIdentifiers for which leases needs to be cleaned up in the lease table. * @return */ - List streamIdentifiers(); + List streamIdentifiersForLeaseCleanup(); /** * Duration to wait before deleting the leases for this stream. @@ -43,12 +43,6 @@ public interface FormerStreamsLeasesDeletionStrategy { */ StreamsLeasesDeletionType leaseDeletionType(); - /** - * Should the leases be cleaned up for deleted streams - * @return true if leases be cleaned up for deleted streams; false otherwise. - */ - boolean shouldCleanupLeasesForDeletedStreams(); - /** * StreamsLeasesDeletionType identifying the different lease cleanup strategies. */ @@ -64,7 +58,7 @@ public interface FormerStreamsLeasesDeletionStrategy { final class NoLeaseDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { @Override - public final List streamIdentifiers() { + public final List streamIdentifiersForLeaseCleanup() { throw new UnsupportedOperationException("StreamIdentifiers not required"); } @@ -77,37 +71,6 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION; } - - @Override - public final boolean shouldCleanupLeasesForDeletedStreams() { - return false; - } - } - - /** - * Strategy for not cleaning up leases for former streams. - */ - final class OnlyDeletedStreamsLeasesCleanupStrategy implements FormerStreamsLeasesDeletionStrategy { - - @Override - public final List streamIdentifiers() { - throw new UnsupportedOperationException("StreamIdentifiers not required"); - } - - @Override - public final Duration waitPeriodToDeleteFormerStreams() { - return Duration.ZERO; - } - - @Override - public final StreamsLeasesDeletionType leaseDeletionType() { - return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION; - } - - @Override - public final boolean shouldCleanupLeasesForDeletedStreams() { - return true; - } } /** @@ -117,7 +80,7 @@ public interface FormerStreamsLeasesDeletionStrategy { abstract class AutoDetectionAndDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { @Override - public final List streamIdentifiers() { + public final List streamIdentifiersForLeaseCleanup() { throw new UnsupportedOperationException("StreamIdentifiers not required"); } @@ -125,15 +88,10 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION; } - - @Override - public boolean shouldCleanupLeasesForDeletedStreams() { - return false; - } } /** - * Strategy to detect the streams for deletion through {@link #streamIdentifiers()} provided by customer at runtime + * Strategy to detect the streams for deletion through {@link #streamIdentifiersForLeaseCleanup()} provided by customer at runtime * and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()} */ abstract class ProvidedStreamsDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy { @@ -142,11 +100,6 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION; } - - @Override - public boolean shouldCleanupLeasesForDeletedStreams() { - return false; - } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index af58d3ab..9e15d988 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -482,7 +482,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return null; } @@ -497,7 +497,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy2() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(ArrayList::new)); @@ -555,7 +555,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return null; } @@ -575,7 +575,7 @@ public class SchedulerTest { public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy2() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(ArrayList::new)); @@ -639,7 +639,7 @@ public class SchedulerTest { public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return null; } @@ -654,7 +654,7 @@ public class SchedulerTest { public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy2() throws DependencyException, ProvisionedThroughputException, InvalidStateException { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { - @Override public List streamIdentifiers() { + @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))