diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e0aba9fb..e621376c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -159,6 +159,7 @@ public class Scheduler implements Runnable { private final Map currentStreamConfigMap; private MultiStreamTracker multiStreamTracker; private FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy; + private InitialPositionInStreamExtended orphanedStreamInitialPositionInStream; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; @@ -231,6 +232,7 @@ public class Scheduler implements Runnable { multiStreamTracker -> { this.multiStreamTracker = multiStreamTracker; this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy(); + this.orphanedStreamInitialPositionInStream = multiStreamTracker.orphanedStreamInitialPositionInStream(); return multiStreamTracker.streamConfigList().stream() .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); }, @@ -512,7 +514,7 @@ public class Scheduler implements Runnable { // Worker 2 : BOOTS_UP -> A,B,C (stale) // // The following streams transition state among two workers are NOT considered safe, where Worker 2 might - // end up deleting the leases for A and D and loose progress made so far. + // end up deleting the leases for A and D and lose progress made so far. // Worker 1 : A,B,C -> A,B,C,D (latest) // Worker 2 : A,B,C -> B,C (stale/partial) // @@ -588,13 +590,13 @@ public class Scheduler implements Runnable { (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); } - private void syncStreamsFromLeaseTableOnAppInit(List leases) { + @VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List leases) { final Set streamIdentifiers = leases.stream() .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) .collect(Collectors.toSet()); for (StreamIdentifier streamIdentifier : streamIdentifiers) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { - currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + currentStreamConfigMap.put(streamIdentifier, getOrphanedStreamConfig(streamIdentifier)); } } } @@ -653,9 +655,9 @@ public class Scheduler implements Runnable { return true; } - // When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end. - private StreamConfig getDefaultStreamConfig(StreamIdentifier streamIdentifier) { - return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + // Generate default StreamConfig for an "orphaned" stream that is in the lease table but not tracked + private StreamConfig getOrphanedStreamConfig(StreamIdentifier streamIdentifier) { + return new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream); } /** @@ -917,7 +919,7 @@ public class Scheduler implements Runnable { // Irrespective of single stream app or multi stream app, streamConfig should always be available. // If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config // to gracefully complete the reading. - final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); + final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getOrphanedStreamConfig(streamIdentifier)); Validate.notNull(streamConfig, "StreamConfig should not be null"); RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java index 1b742509..7e878e2c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.processor; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import java.util.List; @@ -42,4 +44,14 @@ public interface MultiStreamTracker { * @return StreamsLeasesDeletionStrategy */ FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy(); + + /** + * The position for getting records from an "orphaned" stream that is in the lease table but not tracked + * Default assumes that the stream no longer need to be tracked, so use LATEST for faster shard end. + * + *

Default value: {@link InitialPositionInStream#LATEST}

+ */ + default InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() { + return InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index a066ece0..5ff7596c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -40,6 +40,7 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -53,12 +54,14 @@ import java.util.stream.IntStream; import com.google.common.base.Joiner; import com.google.common.collect.Sets; import io.reactivex.plugins.RxJavaPlugins; +import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; @@ -79,6 +82,7 @@ import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementFactory; import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardSyncTaskManager; @@ -97,6 +101,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.Checkpointer; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -152,8 +157,8 @@ public class SchedulerTest { private Checkpointer checkpoint; @Mock private WorkerStateChangeListener workerStateChangeListener; - @Mock - private MultiStreamTracker multiStreamTracker; + @Spy + private TestMultiStreamTracker multiStreamTracker; @Mock private LeaseCleanupManager leaseCleanupManager; @@ -175,25 +180,6 @@ public class SchedulerTest { processorConfig = new ProcessorConfig(shardRecordProcessorFactory); retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName) .retrievalFactory(retrievalFactory); - - final List streamConfigList = new ArrayList() {{ - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.LATEST))); - }}; - - when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); - when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()) - .thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { - @Override public Duration waitPeriodToDeleteFormerStreams() { - return Duration.ZERO; - } - }); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer()); @@ -460,6 +446,75 @@ public class SchedulerTest { Sets.newHashSet(scheduler.currentStreamConfigMap().values())); } + @Test + public final void testMultiStreamSyncFromTableDefaultInitPos() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + // Streams in lease table but not tracked by multiStreamTracker + List leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease() + .streamIdentifier( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)) + .shardId("some_random_shard_id")) + .collect(Collectors.toCollection(LinkedList::new)); + // Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later + leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id")); + + // Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit + // By default, Stream not present in multiStreamTracker will have initial position of LATEST + List expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) + .collect(Collectors.toCollection(LinkedList::new)); + // Include default configs + expectedConfig.addAll(multiStreamTracker.streamConfigList()); + + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); + Map expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( + sc -> sc.streamIdentifier(), sc -> sc)); + Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); + } + + @Test + public final void testMultiStreamSyncFromTableCustomInitPos() + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + Date testTimeStamp = new Date(); + + // Streams in lease table but not tracked by multiStreamTracker + List leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease() + .streamIdentifier( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)) + .shardId("some_random_shard_id")) + .collect(Collectors.toCollection(LinkedList::new)); + // Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later + leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id")); + + // Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit + // Stream not present in multiStreamTracker will have initial position specified by orphanedStreamInitialPositionInStream + List expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPositionAtTimestamp(testTimeStamp))) + .collect(Collectors.toCollection(LinkedList::new)); + // Include default configs + expectedConfig.addAll(multiStreamTracker.streamConfigList()); + + // Mock a specific orphanedStreamInitialPositionInStream specified in multiStreamTracker + when(multiStreamTracker.orphanedStreamInitialPositionInStream()).thenReturn( + InitialPositionInStreamExtended.newInitialPositionAtTimestamp(testTimeStamp)); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); + Map expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( + sc -> sc.streamIdentifier(), sc -> sc)); + Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); + } + @Test public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyAutoDeletionStrategy() throws DependencyException, ProvisionedThroughputException, InvalidStateException { @@ -1111,4 +1166,32 @@ public class SchedulerTest { } } + // TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods without implementing TestMultiStreamTracker class + @NoArgsConstructor + private class TestMultiStreamTracker implements MultiStreamTracker { + @Override + public List streamConfigList(){ + return new ArrayList() {{ + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.LATEST))); + }}; + } + + @Override + public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy(){ + return new AutoDetectionAndDeferredDeletionStrategy() { + @Override + public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ZERO; + } + }; + } + } + }