From e86b1d1f0139efa5612c7ff54945da70096b64fd Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 5 May 2020 17:34:37 -0700 Subject: [PATCH] Adding shouldCleanupLeasesForDeletedStreams option to FormerStreamsLeasesDeletionStrategy and introducing OnlyDeletedStreamsLeasesCleanupStrategy. --- .../amazon/kinesis/coordinator/Scheduler.java | 22 ++++++++- .../FormerStreamsLeasesDeletionStrategy.java | 49 ++++++++++++++++++- .../kinesis/coordinator/SchedulerTest.java | 1 - 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e520fdce..a20f86d1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -509,6 +510,25 @@ public class Scheduler implements Runnable { } }; + if (formerStreamsLeasesDeletionStrategy.shouldCleanupLeasesForDeletedStreams()) { + // We do lease sync for old streams, before leaving to the deletion strategy to delete leases for + // strategy detected leases. Also, for deleted streams we expect the shard sync to remove the + // leases. + Iterator currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator(); + while (currentSetOfStreamsIter.hasNext()) { + StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next(); + if (!newStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Found old/deleted stream: " + streamIdentifier + + ". Syncing shards of that stream."); + ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( + currentStreamConfigMap.get(streamIdentifier)); + shardSyncTaskManager.syncShardAndLeaseInfo(); + currentSetOfStreamsIter.remove(); + streamsSynced.add(streamIdentifier); + } + } + } + if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) { // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. // It is assumed that all the workers will always have the latest and consistent snapshot of streams @@ -611,7 +631,7 @@ public class Scheduler implements Runnable { .groupingBy(MultiStreamLease::streamIdentifier, Collectors.toCollection(ArrayList::new))); } - log.warn("Found old/deleted stream: " + streamIdentifier + ". Deleting leases of this stream."); + log.warn("Found old/deleted stream: " + streamIdentifier + ". Directly deleting leases of this stream."); // Deleting leases will cause the workers to shutdown the record processors for these shards. if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) { currentStreamConfigMap.remove(streamIdentifier); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java index 5c202040..e59266a4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java @@ -38,11 +38,17 @@ public interface FormerStreamsLeasesDeletionStrategy { Duration waitPeriodToDeleteFormerStreams(); /** - * Strategy type for deleting the leases of former streams + * Strategy type for deleting the leases of former active streams. * @return */ StreamsLeasesDeletionType leaseDeletionType(); + /** + * Should the leases be cleaned up for deleted streams + * @return true if leases be cleaned up for deleted streams; false otherwise. + */ + boolean shouldCleanupLeasesForDeletedStreams(); + /** * StreamsLeasesDeletionType identifying the different lease cleanup strategies. */ @@ -71,6 +77,37 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION; } + + @Override + public final boolean shouldCleanupLeasesForDeletedStreams() { + return false; + } + } + + /** + * Strategy for not cleaning up leases for former streams. + */ + final class OnlyDeletedStreamsLeasesCleanupStrategy implements FormerStreamsLeasesDeletionStrategy { + + @Override + public final List streamIdentifiers() { + throw new UnsupportedOperationException("StreamIdentifiers not required"); + } + + @Override + public final Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + + @Override + public final StreamsLeasesDeletionType leaseDeletionType() { + return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION; + } + + @Override + public final boolean shouldCleanupLeasesForDeletedStreams() { + return true; + } } /** @@ -88,6 +125,11 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION; } + + @Override + public boolean shouldCleanupLeasesForDeletedStreams() { + return false; + } } /** @@ -100,6 +142,11 @@ public interface FormerStreamsLeasesDeletionStrategy { public final StreamsLeasesDeletionType leaseDeletionType() { return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION; } + + @Override + public boolean shouldCleanupLeasesForDeletedStreams() { + return false; + } } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index d4f17917..a5fd2add 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -95,7 +95,6 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.Checkpointer; -import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;