diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 4d6c1fd7..478080d0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -19,6 +19,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import io.reactivex.plugins.RxJavaPlugins; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -40,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import lombok.AccessLevel; +import lombok.Data; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; @@ -103,6 +107,7 @@ public class Scheduler implements Runnable { private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L; + private static final long OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS = 1 * 60 * 60 * 1000L; private SchedulerLog slog = new SchedulerLog(); @@ -138,7 +143,6 @@ public class Scheduler implements Runnable { private final long failoverTimeMillis; private final long taskBackoffTimeMillis; private final boolean isMultiStreamMode; - // TODO : halo : make sure we generate streamConfig if entry not present. private final Map currentStreamConfigMap; private MultiStreamTracker multiStreamTracker; private final long listShardsBackoffTimeMillis; @@ -150,6 +154,7 @@ public class Scheduler implements Runnable { private final HierarchicalShardSyncer hierarchicalShardSyncer; private final long schedulerInitializationBackoffTimeMillis; private final LeaderDecider leaderDecider; + private final Map staleStreamDeletionMap = new HashMap<>(); // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -239,8 +244,6 @@ public class Scheduler implements Runnable { this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventHandler = new DiagnosticEventLogger(); - // TODO : Halo : Handle case of no StreamConfig present in streamConfigList() for the supplied streamName. - // TODO : Pass the immutable map here instead of using mst.streamConfigList() this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig .leaseManagementFactory(leaseSerializer, isMultiStreamMode) .createShardSyncTaskManager(this.metricsFactory, streamConfig); @@ -461,9 +464,16 @@ public class Scheduler implements Runnable { newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); - // This is done to ensure that we clean up the stale streams lingering in the lease table. - syncStreamsFromLeaseTableOnAppInit(); + List leases; + // This is done to ensure that we clean up the stale streams lingering in the lease table. + if (!leasesSyncedOnAppInit && isMultiStreamMode) { + leases = fetchMultiStreamLeases(); + syncStreamsFromLeaseTableOnAppInit(leases); + leasesSyncedOnAppInit = true; + } + + // For new streams discovered, do a shard sync and update the currentStreamConfigMap for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream."); @@ -478,41 +488,117 @@ public class Scheduler implements Runnable { } } - // TODO: Remove assumption that each Worker gets the full list of streams + // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. + // It is assumed that all the workers will always have the latest and consistent snapshot of streams + // from the multiStreamTracker. + // + // The following streams transition state among two workers are NOT considered safe, where Worker 2, on + // initialization learn about D from lease table and delete the leases for D, as it is not available + // in its latest MultiStreamTracker. + // Worker 1 : A,B,C -> A,B,C,D (latest) + // Worker 2 : BOOTS_UP -> A,B,C (stale) + // + // The following streams transition state among two workers are NOT considered safe, where Worker 2 might + // end up deleting the leases for A and D and loose progress made so far. + // Worker 1 : A,B,C -> A,B,C,D (latest) + // Worker 2 : A,B,C -> B,C (stale/partial) + // + // In order to give workers with stale stream info, sufficient time to learn about the new streams + // before attempting to delete it, we will be deferring the leases deletion based on the + // defer time period. + Iterator currentStreamConfigIter = currentStreamConfigMap.keySet().iterator(); while (currentStreamConfigIter.hasNext()) { StreamIdentifier streamIdentifier = currentStreamConfigIter.next(); if (!newStreamConfigMap.containsKey(streamIdentifier)) { - log.info("Found old/deleted stream: " + streamIdentifier + ". Syncing shards of that stream."); - ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(currentStreamConfigMap.get(streamIdentifier)); - shardSyncTaskManager.syncShardAndLeaseInfo(); - currentStreamConfigIter.remove(); - streamsSynced.add(streamIdentifier); + staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now()); } } + + // Now let's scan the streamIdentifiers eligible for deferred deletion and delete them. + // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and + // the streamIdentifiers are not present in the latest snapshot. + final Set streamIdsToBeDeleted = staleStreamDeletionMap.keySet().stream() + .filter(streamIdentifier -> + Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() + >= getOldStreamDeferredDeletionPeriodMillis() && + !newStreamConfigMap.containsKey(streamIdentifier)) + .collect(Collectors.toSet()); + + streamsSynced.addAll(deleteMultiStreamLeases(streamIdsToBeDeleted)); + streamSyncWatch.reset().start(); } return streamsSynced; } + @VisibleForTesting + long getOldStreamDeferredDeletionPeriodMillis() { + return OLD_STREAM_DEFERRED_DELETION_PERIOD_MILLIS; + } + @VisibleForTesting boolean shouldSyncStreamsNow() { return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); } - private void syncStreamsFromLeaseTableOnAppInit() + private void syncStreamsFromLeaseTableOnAppInit(List leases) { + final Set streamIdentifiers = leases.stream() + .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) + .collect(Collectors.toSet()); + for (StreamIdentifier streamIdentifier : streamIdentifiers) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + } + } + } + + private List fetchMultiStreamLeases() throws DependencyException, ProvisionedThroughputException, InvalidStateException { - if (!leasesSyncedOnAppInit && isMultiStreamMode) { - final Set streamIdentifiers = leaseCoordinator.leaseRefresher().listLeases().stream() - .map(lease -> StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())) - .collect(Collectors.toSet()); - for (StreamIdentifier streamIdentifier : streamIdentifiers) { - if (!currentStreamConfigMap.containsKey(streamIdentifier)) { - currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + return (List) ((List) leaseCoordinator.leaseRefresher().listLeases()); + } + + private Set deleteMultiStreamLeases(Set streamIdentifiers) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final Set streamsSynced = new HashSet<>(); + List leases = null; + Map> streamIdToShardsMap = null; + for(StreamIdentifier streamIdentifier : streamIdentifiers) { + if (leases == null) { + // Lazy Load once and use many times for this iteration. + leases = fetchMultiStreamLeases(); + } + if (streamIdToShardsMap == null) { + // Lazy load once and use many times for this iteration. + streamIdToShardsMap = leases.stream().collect(Collectors + .groupingBy(MultiStreamLease::streamIdentifier, + Collectors.toCollection(ArrayList::new))); + } + log.warn("Found old/deleted stream: " + streamIdentifier + ". Deleting leases of this stream."); + // Deleting leases will cause the workers to shutdown the record processors for these shards. + if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) { + currentStreamConfigMap.remove(streamIdentifier); + staleStreamDeletionMap.remove(streamIdentifier); + streamsSynced.add(streamIdentifier); + } + } + return streamsSynced; + } + + private boolean deleteMultiStreamLeases(List leases) { + if (leases != null) { + for (MultiStreamLease lease : leases) { + try { + leaseRefresher.deleteLease(lease); + } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { + log.error( + "Unable to delete stale stream lease {}. Skipping further deletions for this stream. Will retry later.", + lease.leaseKey(), e); + return false; } } - leasesSyncedOnAppInit = true; } + return true; } // When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end. @@ -549,6 +635,7 @@ public class Scheduler implements Runnable { * Requests a graceful shutdown of the worker, notifying record processors, that implement * {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * checkpoint. + * * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * previous future. * @@ -575,8 +662,8 @@ public class Scheduler implements Runnable { * * * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown - * completed successfully. A false value indicates that a non-exception case caused the shutdown process to - * terminate early. + * completed successfully. A false value indicates that a non-exception case caused the shutdown process to + * terminate early. */ public Future startGracefulShutdown() { synchronized (this) { @@ -593,8 +680,9 @@ public class Scheduler implements Runnable { * shutdowns in your own executor, or execute the shutdown synchronously. * * @return a callable that run the graceful shutdown process. This may return a callable that return true if the - * graceful shutdown has already been completed. - * @throws IllegalStateException thrown by the callable if another callable has already started the shutdown process. + * graceful shutdown has already been completed. + * @throws IllegalStateException + * thrown by the callable if another callable has already started the shutdown process. */ public Callable createGracefulShutdownCallable() { if (shutdownComplete()) { @@ -736,7 +824,8 @@ public class Scheduler implements Runnable { /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * - * @param shardInfo Kinesis shard info + * @param shardInfo + * Kinesis shard info * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo, @@ -764,7 +853,7 @@ public class Scheduler implements Runnable { @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory); ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, - checkpoint); + checkpoint); // The only case where streamName is not available will be when multistreamtracker not set. In this case, // get the default stream name for the single stream application. final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); @@ -801,6 +890,7 @@ public class Scheduler implements Runnable { /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. + * * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index 04d987a2..a1e0afcc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -36,9 +36,11 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRenewer; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -297,6 +299,10 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); if (StringUtils.isNotEmpty(shardId)) { + if(lease instanceof MultiStreamLease) { + MetricsUtil.addStreamId(scope, + StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); + } MetricsUtil.addShardId(scope, shardId); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index fd036c9f..f576154a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; @@ -109,6 +110,8 @@ public class ProcessTask implements ConsumerTask { @Override public TaskResult call() { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + shardInfo.streamIdentifierSerOpt() + .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); MetricsUtil.addShardId(scope, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; @@ -197,6 +200,8 @@ public class ProcessTask implements ConsumerTask { .checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + shardInfo.streamIdentifierSerOpt() + .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); MetricsUtil.addShardId(scope, shardInfo.shardId()); final long startTime = System.currentTimeMillis(); try { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java index c6f2fe6f..a2f9d84b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsUtil.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils; import lombok.NonNull; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; +import software.amazon.kinesis.common.StreamIdentifier; /** * @@ -26,6 +27,7 @@ import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; public class MetricsUtil { public static final String OPERATION_DIMENSION_NAME = "Operation"; public static final String SHARD_ID_DIMENSION_NAME = "ShardId"; + public static final String STREAM_IDENTIFIER = "StreamId"; private static final String WORKER_IDENTIFIER_DIMENSION = "WorkerIdentifier"; private static final String TIME_METRIC = "Time"; private static final String SUCCESS_METRIC = "Success"; @@ -51,6 +53,11 @@ public class MetricsUtil { addOperation(metricsScope, SHARD_ID_DIMENSION_NAME, shardId); } + public static void addStreamId(@NonNull final MetricsScope metricsScope, @NonNull final StreamIdentifier streamId) { + streamId.accountIdOptional() + .ifPresent(acc -> addOperation(metricsScope, STREAM_IDENTIFIER, streamId.serialize())); + } + public static void addWorkerIdentifier(@NonNull final MetricsScope metricsScope, @NonNull final String workerIdentifier) { addOperation(metricsScope, WORKER_IDENTIFIER_DIMENSION, workerIdentifier); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 82b31915..0b7df88d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -232,6 +232,7 @@ public class KinesisDataFetcher implements DataFetcher { // TODO: Check if this metric is fine to be added final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); + MetricsUtil.addStreamId(metricsScope, streamIdentifier); MetricsUtil.addShardId(metricsScope, shardId); boolean success = false; long startTime = System.currentTimeMillis(); @@ -315,6 +316,7 @@ public class KinesisDataFetcher implements DataFetcher { GetRecordsRequest request = getGetRecordsRequest(nextIterator); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); + MetricsUtil.addStreamId(metricsScope, streamIdentifier); MetricsUtil.addShardId(metricsScope, shardId); boolean success = false ; long startTime = System.currentTimeMillis(); @@ -325,7 +327,7 @@ public class KinesisDataFetcher implements DataFetcher { } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { - // TODO: Check behavior + // TODO: Check behaviorF log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId); throw new RuntimeException(e); } catch (TimeoutException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 4a0c75eb..c80c9860 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; @@ -88,6 +89,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private boolean started = false; private final String operation; + private final StreamIdentifier streamId; private final String streamAndShardId; private Subscriber subscriber; @VisibleForTesting @Getter @@ -219,8 +221,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; - this.streamAndShardId = - this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier().serialize() + ":" + shardId; + this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier(); + this.streamAndShardId = this.streamId.serialize() + ":" + shardId; } @Override @@ -453,6 +455,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { log.info("{} : records threw ExpiredIteratorException - restarting" + " after greatest seqNum passed to customer", streamAndShardId, e); + MetricsUtil.addStreamId(scope, streamId); scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); publisherSession.dataFetcher().restartIterator(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 1de7b101..96cec37a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.atMost; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -443,7 +444,7 @@ public class SchedulerTest { } @Test - public final void testMultiStreamOnlyStaleStreamsAreSynced() + public final void testMultiStreamStaleStreamsAreNotDeletedImmediately() throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -462,16 +463,49 @@ public class SchedulerTest { metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedPendingStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Collectors.toCollection(HashSet::new)); + Assert.assertEquals(Sets.newHashSet(), syncedStreams); + Assert.assertEquals(Sets.newHashSet(streamConfigList1), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + Assert.assertEquals(expectedPendingStreams, + scheduler.staleStreamDeletionMap().keySet()); + } + + @Test + public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(HashSet::new)); Assert.assertEquals(expectedSyncedStreams, syncedStreams); Assert.assertEquals(Sets.newHashSet(streamConfigList2), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + Assert.assertEquals(Sets.newHashSet(), + scheduler.staleStreamDeletionMap().keySet()); } @Test - public final void testMultiStreamSyncOnlyNewAndStaleStreamsAreSynced() + public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately() throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( @@ -490,6 +524,47 @@ public class SchedulerTest { metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + Set expectedSyncedStreams = IntStream.range(5, 7) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(HashSet::new)); + Set expectedPendingStreams = IntStream.range(1, 3) + .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + .collect(Collectors.toCollection(HashSet::new)); + Assert.assertEquals(expectedSyncedStreams, syncedStreams); + List expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + Assert.assertEquals(Sets.newHashSet(expectedCurrentStreamConfigs), + Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + Assert.assertEquals(expectedPendingStreams, + scheduler.staleStreamDeletionMap().keySet()); + } + + @Test + public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAfterDefermentPeriod() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + when(scheduler.getOldStreamDeferredDeletionPeriodMillis()).thenReturn(0L); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) @@ -497,6 +572,8 @@ public class SchedulerTest { Assert.assertEquals(expectedSyncedStreams, syncedStreams); Assert.assertEquals(Sets.newHashSet(streamConfigList2), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + Assert.assertEquals(Sets.newHashSet(), + scheduler.staleStreamDeletionMap().keySet()); } @Test