From 61e500d4fdf0d1e8695c4cb2804739c549d0e47a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 21 Apr 2020 22:17:51 -0700 Subject: [PATCH] Adding logging for streanms to be deleted --- .../software/amazon/kinesis/coordinator/Scheduler.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 22be7fc7..12cbae8f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -458,8 +458,7 @@ public class Scheduler implements Runnable { if (shouldSyncStreamsNow()) { final Map newStreamConfigMap = new HashMap<>(); - final long waitPeriodToDeleteOldStreamsMillis = multiStreamTracker.waitPeriodToDeleteOldStreams() - .toMillis(); + final Duration waitPeriodToDeleteOldStreams = multiStreamTracker.waitPeriodToDeleteOldStreams(); // Making an immutable copy newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); @@ -525,13 +524,17 @@ public class Scheduler implements Runnable { final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream() .filter(streamIdentifier -> Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() - >= waitPeriodToDeleteOldStreamsMillis).collect(Collectors.toSet()); + >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted)); // Purge the active streams from stale streams list. final Set staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true); removeActiveStreamsFromStaleStreamsList(staleStreamIdsToBeRevived); + log.warn("Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ", + staleStreamDeletionMap.entrySet().stream().collect(Collectors + .toMap(Map.Entry::getKey, entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams)))); + streamSyncWatch.reset().start(); } return streamsSynced;