Introducing lease deletion strategy for multistreaming

This commit is contained in:
Ashwin Giridharan 2020-04-24 00:01:19 -07:00
parent 167ecfb08c
commit 038524e0b1
6 changed files with 399 additions and 92 deletions

View file

@ -124,7 +124,10 @@ public class ConfigsBuilder {
* @param workerIdentifier * @param workerIdentifier
* @param shardRecordProcessorFactory * @param shardRecordProcessorFactory
*/ */
public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.right(streamName); this.appStreamTracker = Either.right(streamName);
this.applicationName = applicationName; this.applicationName = applicationName;
this.kinesisClient = kinesisClient; this.kinesisClient = kinesisClient;
@ -144,7 +147,10 @@ public class ConfigsBuilder {
* @param workerIdentifier * @param workerIdentifier
* @param shardRecordProcessorFactory * @param shardRecordProcessorFactory
*/ */
public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
@NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
@NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
@NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
this.appStreamTracker = Either.left(multiStreamTracker); this.appStreamTracker = Either.left(multiStreamTracker);
this.applicationName = applicationName; this.applicationName = applicationName;
this.kinesisClient = kinesisClient; this.kinesisClient = kinesisClient;

View file

@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -40,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.AccessLevel; import lombok.AccessLevel;
@ -83,7 +83,11 @@ import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
@ -92,6 +96,8 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalConfig;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType;
/** /**
* *
*/ */
@ -106,6 +112,10 @@ public class Scheduler implements Runnable {
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private SchedulerLog slog = new SchedulerLog(); private SchedulerLog slog = new SchedulerLog();
@ -143,6 +153,7 @@ public class Scheduler implements Runnable {
private final boolean isMultiStreamMode; private final boolean isMultiStreamMode;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap; private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private MultiStreamTracker multiStreamTracker; private MultiStreamTracker multiStreamTracker;
private FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy;
private final long listShardsBackoffTimeMillis; private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts; private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher; private final LeaseRefresher leaseRefresher;
@ -212,6 +223,7 @@ public class Scheduler implements Runnable {
this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map( this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> { multiStreamTracker -> {
this.multiStreamTracker = multiStreamTracker; this.multiStreamTracker = multiStreamTracker;
this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy();
return multiStreamTracker.streamConfigList().stream() return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
}, },
@ -457,8 +469,12 @@ public class Scheduler implements Runnable {
final Set<StreamIdentifier> streamsSynced = new HashSet<>(); final Set<StreamIdentifier> streamsSynced = new HashSet<>();
if (shouldSyncStreamsNow()) { if (shouldSyncStreamsNow()) {
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER);
try {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>(); final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
final Duration waitPeriodToDeleteOldStreams = multiStreamTracker.waitPeriodToDeleteOldStreams(); final Duration waitPeriodToDeleteOldStreams = formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams();
// Making an immutable copy // Making an immutable copy
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
@ -487,6 +503,13 @@ public class Scheduler implements Runnable {
} }
} }
final Consumer<StreamIdentifier> enqueueStreamLeaseDeletionOperation = streamIdentifier -> {
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now());
}
};
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
// It is assumed that all the workers will always have the latest and consistent snapshot of streams // It is assumed that all the workers will always have the latest and consistent snapshot of streams
// from the multiStreamTracker. // from the multiStreamTracker.
@ -506,43 +529,48 @@ public class Scheduler implements Runnable {
// before attempting to delete it, we will be deferring the leases deletion based on the // before attempting to delete it, we will be deferring the leases deletion based on the
// defer time period. // defer time period.
Iterator<StreamIdentifier> currentStreamConfigIter = currentStreamConfigMap.keySet().iterator(); currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));
while (currentStreamConfigIter.hasNext()) {
StreamIdentifier streamIdentifier = currentStreamConfigIter.next(); } else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
if (!newStreamConfigMap.containsKey(streamIdentifier)) { Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiers()).ifPresent(
staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now()); streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
}
} }
// Now let's scan the streamIdentifiers eligible for deferred deletion and delete them. // Now let's scan the streamIdentifiers eligible for deferred deletion and delete them.
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
// the streamIdentifiers are not present in the latest snapshot. // the streamIdentifiers are not present in the latest snapshot.
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet() final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.stream().collect(Collectors .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Collectors.toSet())); Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream() final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
.filter(streamIdentifier -> streamsSynced.addAll(deletedStreamsLeases);
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis()
>= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
streamsSynced.addAll(deleteMultiStreamLeases(staleStreamIdsToBeDeleted));
// Purge the active streams from stale streams list. // Purge the active streams from stale streams list.
final Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true); final Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true);
removeActiveStreamsFromStaleStreamsList(staleStreamIdsToBeRevived); removeStreamsFromStaleStreamsList(staleStreamIdsToBeRevived);
log.warn("Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ", log.warn(
staleStreamDeletionMap.entrySet().stream().collect(Collectors "Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ",
.toMap(Map.Entry::getKey, entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams)))); staleStreamDeletionMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams))));
streamSyncWatch.reset().start(); streamSyncWatch.reset().start();
MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
} finally {
MetricsUtil.endScope(metricsScope);
}
} }
return streamsSynced; return streamsSynced;
} }
@VisibleForTesting @VisibleForTesting boolean shouldSyncStreamsNow() {
boolean shouldSyncStreamsNow() { return isMultiStreamMode &&
return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
} }
private void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) { private void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
@ -561,7 +589,7 @@ public class Scheduler implements Runnable {
return (List<MultiStreamLease>) ((List) leaseCoordinator.leaseRefresher().listLeases()); return (List<MultiStreamLease>) ((List) leaseCoordinator.leaseRefresher().listLeases());
} }
private void removeActiveStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdentifiers) { private void removeStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdentifiers) {
for(StreamIdentifier streamIdentifier : streamIdentifiers) { for(StreamIdentifier streamIdentifier : streamIdentifiers) {
staleStreamDeletionMap.remove(streamIdentifier); staleStreamDeletionMap.remove(streamIdentifier);
} }

View file

@ -94,6 +94,11 @@ public class MetricsUtil {
metricsScope.addData(metricName, success ? 1 : 0, StandardUnit.COUNT, metricsLevel); metricsScope.addData(metricName, success ? 1 : 0, StandardUnit.COUNT, metricsLevel);
} }
public static void addCount(@NonNull final MetricsScope metricsScope, final String dimension,
final long count, @NonNull final MetricsLevel metricsLevel) {
metricsScope.addData(dimension, count, StandardUnit.COUNT, metricsLevel);
}
public static void endScope(@NonNull final MetricsScope metricsScope) { public static void endScope(@NonNull final MetricsScope metricsScope) {
metricsScope.end(); metricsScope.end();
} }

View file

@ -0,0 +1,109 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.processor;
import software.amazon.kinesis.common.StreamIdentifier;
import java.time.Duration;
import java.util.List;
/**
* Strategy for cleaning up the leases for former streams.
*/
public interface FormerStreamsLeasesDeletionStrategy {
/**
* StreamIdentifiers for which leases needs to be cleaned up in the lease table.
* @return
*/
List<StreamIdentifier> streamIdentifiers();
/**
* Duration to wait before deleting the leases for this stream.
* @return
*/
Duration waitPeriodToDeleteFormerStreams();
/**
* Strategy type for deleting the leases of former streams
* @return
*/
StreamsLeasesDeletionType leaseDeletionType();
/**
* StreamsLeasesDeletionType identifying the different lease cleanup strategies.
*/
enum StreamsLeasesDeletionType {
NO_STREAMS_LEASES_DELETION,
FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION,
PROVIDED_STREAMS_DEFERRED_DELETION
}
/**
* Strategy for not cleaning up leases for former streams.
*/
final class NoLeaseDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
@Override
public final List<StreamIdentifier> streamIdentifiers() {
throw new UnsupportedOperationException("StreamIdentifiers not required");
}
@Override
public final Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
@Override
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.NO_STREAMS_LEASES_DELETION;
}
}
/**
* Strategy for auto detection the old of former streams based on the {@link MultiStreamTracker#streamConfigList()}
* and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()}
*/
abstract class AutoDetectionAndDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
@Override
public final List<StreamIdentifier> streamIdentifiers() {
throw new UnsupportedOperationException("StreamIdentifiers not required");
}
@Override
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION;
}
}
/**
* Strategy to detect the streams for deletion through {@link #streamIdentifiers()} provided by customer at runtime
* and do deferred deletion based on {@link #waitPeriodToDeleteFormerStreams()}
*/
abstract class ProvidedStreamsDeferredDeletionStrategy implements FormerStreamsLeasesDeletionStrategy {
@Override
public final StreamsLeasesDeletionType leaseDeletionType() {
return StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION;
}
}
}

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.processor;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import java.time.Duration;
import java.util.List; import java.util.List;
/** /**
@ -29,15 +28,18 @@ public interface MultiStreamTracker {
/** /**
* Returns the list of stream config, to be processed by the current application. * Returns the list of stream config, to be processed by the current application.
* Note that this method will be called periodically called by the KCL to learn about the new and old streams. * <b>Note that the streams list CAN be changed during the application runtime.</b>
* This method will be called periodically by the KCL to learn about the change in streams to process.
* *
* @return List of StreamConfig * @return List of StreamConfig
*/ */
List<StreamConfig> streamConfigList(); List<StreamConfig> streamConfigList();
/** /**
* Duration to wait before deleting the old streams in the lease table. * Strategy to delete leases of old streams in the lease table.
* @return Wait time before deleting old streams * <b>Note that the strategy CANNOT be changed during the application runtime.</b>
*
* @return StreamsLeasesDeletionStrategy
*/ */
Duration waitPeriodToDeleteOldStreams(); FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy();
} }

View file

@ -35,6 +35,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.atMost; import static org.mockito.internal.verification.VerificationModeFactory.atMost;
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.*;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -94,6 +95,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
@ -181,7 +183,12 @@ public class SchedulerTest {
}}; }};
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ofHours(1L)); when(multiStreamTracker.formerStreamsLeasesDeletionStrategy())
.thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
});
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null)); when(shardSyncTaskManager.executeShardSyncTask()).thenReturn(new TaskResult(null));
@ -442,7 +449,56 @@ public class SchedulerTest {
} }
@Test @Test
public final void testMultiStreamStaleStreamsAreNotDeletedImmediately() public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyAutoDeletionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofHours(1);
}
});
testMultiStreamStaleStreamsAreNotDeletedImmediately(true);
}
@Test
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyNoDeletionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy());
testMultiStreamStaleStreamsAreNotDeletedImmediately(false);
}
@Test
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
return null;
}
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofHours(1);
}
});
testMultiStreamStaleStreamsAreNotDeletedImmediately(false);
}
@Test
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedListStrategy2()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(ArrayList::new));
}
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofHours(1);
}
});
testMultiStreamStaleStreamsAreNotDeletedImmediately(true);
}
private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance( StreamIdentifier.multiStreamInstance(
@ -457,6 +513,7 @@ public class SchedulerTest {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
@ -467,12 +524,59 @@ public class SchedulerTest {
Assert.assertEquals(Sets.newHashSet(), syncedStreams); Assert.assertEquals(Sets.newHashSet(), syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList1), Assert.assertEquals(Sets.newHashSet(streamConfigList1),
Sets.newHashSet(scheduler.currentStreamConfigMap().values())); Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
Assert.assertEquals(expectedPendingStreams, Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams : Sets.newHashSet(),
scheduler.staleStreamDeletionMap().keySet()); scheduler.staleStreamDeletionMap().keySet());
} }
@Test @Test
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod() public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithAutoDetectionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
});
testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(true, null);
}
@Test
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
return null;
}
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
});
HashSet<StreamConfig> currentStreamConfigMapOverride = IntStream.range(1, 5).mapToObj(
streamId -> new StreamConfig(StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(HashSet::new));
testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(false, currentStreamConfigMapOverride);
}
@Test
public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithProvidedListStrategy2()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(ArrayList::new));
}
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
});
testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(true, null);
}
private final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams, Set<StreamConfig> currentStreamConfigMapOverride)
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance( StreamIdentifier.multiStreamInstance(
@ -490,20 +594,69 @@ public class SchedulerTest {
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO);
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Set<StreamIdentifier> expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect(
Collectors.toCollection(HashSet::new)); Collectors.toCollection(HashSet::new));
Assert.assertEquals(expectedSyncedStreams, syncedStreams); Assert.assertEquals(expectSyncedStreams ? expectedSyncedStreams : Sets.newHashSet(), syncedStreams);
Assert.assertEquals(Sets.newHashSet(streamConfigList2), Assert.assertEquals(currentStreamConfigMapOverride == null ? Sets.newHashSet(streamConfigList2) : currentStreamConfigMapOverride,
Sets.newHashSet(scheduler.currentStreamConfigMap().values())); Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
Assert.assertEquals(Sets.newHashSet(), Assert.assertEquals(Sets.newHashSet(),
scheduler.staleStreamDeletionMap().keySet()); scheduler.staleStreamDeletionMap().keySet());
} }
@Test @Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately() public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithAutoDetectionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofHours(1);
}
});
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true);
}
@Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithNoDeletionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new NoLeaseDeletionStrategy());
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false);
}
@Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
return null;
}
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofHours(1);
}
});
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(false);
}
@Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediatelyWithProvidedListStrategy2()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() {
@Override public List<StreamIdentifier> streamIdentifiers() {
return IntStream.range(1, 3)
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)))
.collect(Collectors.toCollection(ArrayList::new));
}
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofHours(1);
}
});
testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(true);
}
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance( StreamIdentifier.multiStreamInstance(
@ -538,7 +691,7 @@ public class SchedulerTest {
.collect(Collectors.toCollection(LinkedList::new)); .collect(Collectors.toCollection(LinkedList::new));
Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs), Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs),
Sets.newHashSet(scheduler.currentStreamConfigMap().values())); Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
Assert.assertEquals(expectedPendingStreams, Assert.assertEquals(expectPendingStreamsForDeletion ? expectedPendingStreams: Sets.newHashSet(),
scheduler.staleStreamDeletionMap().keySet()); scheduler.staleStreamDeletionMap().keySet());
} }
@ -561,7 +714,11 @@ public class SchedulerTest {
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
when(multiStreamTracker.waitPeriodToDeleteOldStreams()).thenReturn(Duration.ZERO); when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
});
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamIdentifier> expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) Set<StreamIdentifier> expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7))
.mapToObj(streamId -> StreamIdentifier.multiStreamInstance( .mapToObj(streamId -> StreamIdentifier.multiStreamInstance(