Adding shouldCleanupLeasesForDeletedStreams option to FormerStreamsLeasesDeletionStrategy and introducing OnlyDeletedStreamsLeasesCleanupStrategy.

This commit is contained in:
Ashwin Giridharan 2020-05-05 17:34:37 -07:00
parent 8e4b8d789b
commit e86b1d1f01
3 changed files with 69 additions and 3 deletions

View file

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -509,6 +510,25 @@ public class Scheduler implements Runnable {
}
};
if (formerStreamsLeasesDeletionStrategy.shouldCleanupLeasesForDeletedStreams()) {
// We do lease sync for old streams, before leaving to the deletion strategy to delete leases for
// strategy detected leases. Also, for deleted streams we expect the shard sync to remove the
// leases.
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
while (currentSetOfStreamsIter.hasNext()) {
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found old/deleted stream: " + streamIdentifier
+ ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.syncShardAndLeaseInfo();
currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier);
}
}
}
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
// It is assumed that all the workers will always have the latest and consistent snapshot of streams
@ -611,7 +631,7 @@ public class Scheduler implements Runnable {
.groupingBy(MultiStreamLease::streamIdentifier,
Collectors.toCollection(ArrayList::new)));
}
log.warn("Found old/deleted stream: " + streamIdentifier + ". Deleting leases of this stream.");
log.warn("Found old/deleted stream: " + streamIdentifier + ". Directly deleting leases of this stream.");
// Deleting leases will cause the workers to shutdown the record processors for these shards.
if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) {
currentStreamConfigMap.remove(streamIdentifier);

View file

@ -38,11 +38,17 @@ public interface FormerStreamsLeasesDeletionStrategy {
Duration waitPeriodToDeleteFormerStreams();
/**
* Strategy type for deleting the leases of former streams
* Strategy type for deleting the leases of former active streams.
* @return
*/
StreamsLeasesDeletionType leaseDeletionType();
/**
* Should the leases be cleaned up for deleted streams
* @return true if leases be cleaned up for deleted streams; false otherwise.
*/
boolean shouldCleanupLeasesForDeletedStreams();
/**
* StreamsLeasesDeletionType identifying the different lease cleanup strategies.
*/
@ -71,6 +77,37 @@ public interface FormerStreamsLeasesDeletionStrategy {
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION;
}
@Override
public final boolean shouldCleanupLeasesForDeletedStreams() {
return false;
}
}
/**
* Strategy for not cleaning up leases for former streams.
*/
final class OnlyDeletedStreamsLeasesCleanupStrategy implements FormerStreamsLeasesDeletionStrategy {
@Override
public final List<StreamIdentifier> streamIdentifiers() {
throw new UnsupportedOperationException("StreamIdentifiers not required");
}
@Override
public final Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
@Override
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION;
}
@Override
public final boolean shouldCleanupLeasesForDeletedStreams() {
return true;
}
}
/**
@ -88,6 +125,11 @@ public interface FormerStreamsLeasesDeletionStrategy {
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION;
}
@Override
public boolean shouldCleanupLeasesForDeletedStreams() {
return false;
}
}
/**
@ -100,6 +142,11 @@ public interface FormerStreamsLeasesDeletionStrategy {
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION;
}
@Override
public boolean shouldCleanupLeasesForDeletedStreams() {
return false;
}
}
}

View file

@ -95,7 +95,6 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;