Adding logging for streanms to be deleted
This commit is contained in:
parent
596e3ee797
commit
61e500d4fd
1 changed files with 6 additions and 3 deletions
|
|
@ -458,8 +458,7 @@ public class Scheduler implements Runnable {
|
||||||
|
|
||||||
if (shouldSyncStreamsNow()) {
|
if (shouldSyncStreamsNow()) {
|
||||||
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
|
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
|
||||||
final long waitPeriodToDeleteOldStreamsMillis = multiStreamTracker.waitPeriodToDeleteOldStreams()
|
final Duration waitPeriodToDeleteOldStreams = multiStreamTracker.waitPeriodToDeleteOldStreams();
|
||||||
.toMillis();
|
|
||||||
// Making an immutable copy
|
// Making an immutable copy
|
||||||
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
|
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
|
||||||
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
|
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
|
||||||
|
|
@ -525,13 +524,17 @@ public class Scheduler implements Runnable {
|
||||||
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream()
|
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream()
|
||||||
.filter(streamIdentifier ->
|
.filter(streamIdentifier ->
|
||||||
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis()
|
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis()
|
||||||
>= waitPeriodToDeleteOldStreamsMillis).collect(Collectors.toSet());
|
>= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
|
||||||
streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted));
|
streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted));
|
||||||
|
|
||||||
// Purge the active streams from stale streams list.
|
// Purge the active streams from stale streams list.
|
||||||
final Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true);
|
final Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true);
|
||||||
removeActiveStreamsFromStaleStreamsList(staleStreamIdsToBeRevived);
|
removeActiveStreamsFromStaleStreamsList(staleStreamIdsToBeRevived);
|
||||||
|
|
||||||
|
log.warn("Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ",
|
||||||
|
staleStreamDeletionMap.entrySet().stream().collect(Collectors
|
||||||
|
.toMap(Map.Entry::getKey, entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams))));
|
||||||
|
|
||||||
streamSyncWatch.reset().start();
|
streamSyncWatch.reset().start();
|
||||||
}
|
}
|
||||||
return streamsSynced;
|
return streamsSynced;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue