diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 478080d0..5a486a0d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import lombok.AccessLevel; -import lombok.Data; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; @@ -518,14 +517,19 @@ public class Scheduler implements Runnable { // Now let's scan the streamIdentifiers eligible for deferred deletion and delete them. // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and // the streamIdentifiers are not present in the latest snapshot. - final Set streamIdsToBeDeleted = staleStreamDeletionMap.keySet().stream() + final Map> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet() + .stream().collect(Collectors + .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), + Collectors.toSet())); + final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream() .filter(streamIdentifier -> Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() - >= getOldStreamDeferredDeletionPeriodMillis() && - !newStreamConfigMap.containsKey(streamIdentifier)) - .collect(Collectors.toSet()); + >= getOldStreamDeferredDeletionPeriodMillis()).collect(Collectors.toSet()); + streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted)); - streamsSynced.addAll(deleteMultiStreamLeases(streamIdsToBeDeleted)); + // Purge the active streams from stale streams list. + final Set staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true); + removeActiveStreamsFromStaleStreamsList(staleStreamIdsToBeRevived); streamSyncWatch.reset().start(); } @@ -558,6 +562,12 @@ public class Scheduler implements Runnable { return (List) ((List) leaseCoordinator.leaseRefresher().listLeases()); } + private void removeActiveStreamsFromStaleStreamsList(Set streamIdentifiers) { + for(StreamIdentifier streamIdentifier : streamIdentifiers) { + staleStreamDeletionMap.remove(streamIdentifier); + } + } + private Set deleteMultiStreamLeases(Set streamIdentifiers) throws DependencyException, ProvisionedThroughputException, InvalidStateException { final Set streamsSynced = new HashSet<>();