Removing one of the lease clean up strategy as it is no longer required with distributed lease cleanup

This commit is contained in:
Ashwin Giridharan 2020-06-12 09:28:30 -07:00
parent ce38178399
commit c1cbb6cf6d
3 changed files with 16 additions and 63 deletions

View file

@ -113,6 +113,7 @@ public class Scheduler implements Runnable {
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false;
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
@ -491,10 +492,9 @@ public class Scheduler implements Runnable {
}
};
if (formerStreamsLeasesDeletionStrategy.shouldCleanupLeasesForDeletedStreams()) {
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
// We do lease sync for old streams, before leaving to the deletion strategy to delete leases for
// strategy detected leases. Also, for deleted streams we expect the shard sync to remove the
// leases.
// strategy detected leases.
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
while (currentSetOfStreamsIter.hasNext()) {
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
@ -533,13 +533,13 @@ public class Scheduler implements Runnable {
currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));
} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiers()).ifPresent(
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
}
// Now let's scan the streamIdentifiers eligible for deferred deletion and delete them.
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
// the streamIdentifiers are not present in the latest snapshot.
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->

View file

@ -29,7 +29,7 @@ public interface FormerStreamsLeasesDeletionStrategy {
* StreamIdentifiers for which leases needs to be cleaned up in the lease table.
* @return
*/
List<StreamIdentifier> streamIdentifiers();
List<StreamIdentifier> streamIdentifiersForLeaseCleanup();
/**
* Duration to wait before deleting the leases for this stream.
@ -43,12 +43,6 @@ public interface FormerStreamsLeasesDeletionStrategy {
*/
StreamsLeasesDeletionType leaseDeletionType();
/**
* Should the leases be cleaned up for deleted streams
* @return true if leases be cleaned up for deleted streams; false otherwise.
*/
boolean shouldCleanupLeasesForDeletedStreams();
/**
* StreamsLeasesDeletionType identifying the different lease cleanup strategies.
*/
@ -64,7 +58,7 @@ public interface FormerStreamsLeasesDeletionStrategy {
final class NoLeaseDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
@Override
public final List<StreamIdentifier> streamIdentifiers() {
public final List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
throw new UnsupportedOperationException("StreamIdentifiers not required");
}
@ -77,37 +71,6 @@ public interface FormerStreamsLeasesDeletionStrategy {
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION;
}
@Override
public final boolean shouldCleanupLeasesForDeletedStreams() {
return false;
}
}
/**
* Strategy for not cleaning up leases for former streams.
*/
final class OnlyDeletedStreamsLeasesCleanupStrategy implements FormerStreamsLeasesDeletionStrategy {
@Override
public final List<StreamIdentifier> streamIdentifiers() {
throw new UnsupportedOperationException("StreamIdentifiers not required");
}
@Override
public final Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
@Override
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION;
}
@Override
public final boolean shouldCleanupLeasesForDeletedStreams() {
return true;
}
}
/**
@ -117,7 +80,7 @@ public interface FormerStreamsLeasesDeletionStrategy {
abstract class AutoDetectionAndDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
@Override
public final List<StreamIdentifier> streamIdentifiers() {
public final List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
throw new UnsupportedOperationException("StreamIdentifiers not required");
}
@ -125,15 +88,10 @@ public interface FormerStreamsLeasesDeletionStrategy {
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION;
}
@Override
public boolean shouldCleanupLeasesForDeletedStreams() {
return false;
}
}
/**
* Strategy to detect the streams for deletion through {@link #streamIdentifiers()} provided by customer at runtime
* Strategy to detect the streams for deletion through {@link #streamIdentifiersForLeaseCleanup()} provided by customer at runtime
* and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()}
*/
abstract class ProvidedStreamsDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
@ -142,11 +100,6 @@ public interface FormerStreamsLeasesDeletionStrategy {
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION;
}
@Override
public boolean shouldCleanupLeasesForDeletedStreams() {
return false;
}
}
}

View file

@ -482,7 +482,7 @@ public class SchedulerTest {
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return null;
}
@ -497,7 +497,7 @@ public class SchedulerTest {
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy2()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(ArrayList::new));
@ -555,7 +555,7 @@ public class SchedulerTest {
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return null;
}
@ -575,7 +575,7 @@ public class SchedulerTest {
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy2()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(ArrayList::new));
@ -639,7 +639,7 @@ public class SchedulerTest {
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return null;
}
@ -654,7 +654,7 @@ public class SchedulerTest {
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy2()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
@Override public List<StreamIdentifier> streamIdentifiersForLeaseCleanup() {
return IntStream.range(1, 3)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))