Adding logic to garbage collect active streams from stale streams list.
This commit is contained in:
parent
f51657f6f7
commit
45387bfd74
1 changed files with 16 additions and 6 deletions
|
|
@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import lombok.AccessLevel;
|
import lombok.AccessLevel;
|
||||||
import lombok.Data;
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
|
|
@ -518,14 +517,19 @@ public class Scheduler implements Runnable {
|
||||||
// Now let's scan the streamIdentifiers eligible for deferred deletion and delete them.
|
// Now let's scan the streamIdentifiers eligible for deferred deletion and delete them.
|
||||||
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
|
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
|
||||||
// the streamIdentifiers are not present in the latest snapshot.
|
// the streamIdentifiers are not present in the latest snapshot.
|
||||||
final Set<StreamIdentifier> streamIdsToBeDeleted = staleStreamDeletionMap.keySet().stream()
|
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet()
|
||||||
|
.stream().collect(Collectors
|
||||||
|
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier),
|
||||||
|
Collectors.toSet()));
|
||||||
|
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream()
|
||||||
.filter(streamIdentifier ->
|
.filter(streamIdentifier ->
|
||||||
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis()
|
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis()
|
||||||
>= getOldStreamDeferredDeletionPeriodMillis() &&
|
>= getOldStreamDeferredDeletionPeriodMillis()).collect(Collectors.toSet());
|
||||||
!newStreamConfigMap.containsKey(streamIdentifier))
|
streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted));
|
||||||
.collect(Collectors.toSet());
|
|
||||||
|
|
||||||
streamsSynced.addAll(deleteMultiStreamLeases(streamIdsToBeDeleted));
|
// Purge the active streams from stale streams list.
|
||||||
|
final Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true);
|
||||||
|
removeActiveStreamsFromStaleStreamsList(staleStreamIdsToBeRevived);
|
||||||
|
|
||||||
streamSyncWatch.reset().start();
|
streamSyncWatch.reset().start();
|
||||||
}
|
}
|
||||||
|
|
@ -558,6 +562,12 @@ public class Scheduler implements Runnable {
|
||||||
return (List<MultiStreamLease>) ((List) leaseCoordinator.leaseRefresher().listLeases());
|
return (List<MultiStreamLease>) ((List) leaseCoordinator.leaseRefresher().listLeases());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeActiveStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdentifiers) {
|
||||||
|
for(StreamIdentifier streamIdentifier : streamIdentifiers) {
|
||||||
|
staleStreamDeletionMap.remove(streamIdentifier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> streamIdentifiers)
|
private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> streamIdentifiers)
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||||
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
|
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue