From 17b82a0e6766c0a70d6a52a2f28f2cd59d7ffe75 Mon Sep 17 00:00:00 2001 From: stair <123031771+stair-aws@users.noreply.github.com> Date: Tue, 7 Feb 2023 02:04:34 -0500 Subject: [PATCH] Refactored `MultiStreamTracker` to provide and enhance OOP for both (#1028) single- and multi-stream trackers. + converted `Scheduler#currentStreamConfigMap` to `ConcurrentHashMap` + eliminated a responsibility from Scheduler (i.e., orphan config generation) --- .../amazon/kinesis/common/ConfigsBuilder.java | 59 ++++++++---- .../kinesis/common/DeprecationUtils.java | 53 +++++++++++ .../amazon/kinesis/coordinator/Scheduler.java | 62 ++++--------- .../kinesis/processor/MultiStreamTracker.java | 41 ++------- .../processor/SingleStreamTracker.java | 81 +++++++++++++++++ .../kinesis/processor/StreamTracker.java | 85 ++++++++++++++++++ .../kinesis/retrieval/RetrievalConfig.java | 46 +++++++--- .../kinesis/common/ConfigsBuilderTest.java | 89 +++++++++++++++++++ .../kinesis/common/DeprecationUtilsTest.java | 45 ++++++++++ .../processor/SingleStreamTrackerTest.java | 66 ++++++++++++++ .../retrieval/RetrievalConfigTest.java | 85 ++++++++++++++++++ 11 files changed, 605 insertions(+), 107 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DeprecationUtils.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/DeprecationUtilsTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/RetrievalConfigTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java index 09d28495..a5bbfebe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.common; +import java.util.function.Function; + import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -35,6 +37,8 @@ import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; /** @@ -46,9 +50,18 @@ public class ConfigsBuilder { /** * Either the name of the stream to consume records from * Or MultiStreamTracker for all the streams to consume records from + * + * @deprecated Both single- and multi-stream support is now provided by {@link StreamTracker}. + * @see #streamTracker */ + @Deprecated private Either appStreamTracker; + /** + * Stream(s) to be consumed by this KCL application. + */ + private StreamTracker streamTracker; + /** * Application name for the KCL Worker */ @@ -115,7 +128,8 @@ public class ConfigsBuilder { } /** - * Constructor to initialize ConfigsBuilder with StreamName + * Constructor to initialize ConfigsBuilder for a single stream. + * * @param streamName * @param applicationName * @param kinesisClient @@ -128,18 +142,19 @@ public class ConfigsBuilder { @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { - this.appStreamTracker = Either.right(streamName); - this.applicationName = applicationName; - this.kinesisClient = kinesisClient; - this.dynamoDBClient = dynamoDBClient; - this.cloudWatchClient = cloudWatchClient; - this.workerIdentifier = workerIdentifier; - this.shardRecordProcessorFactory = shardRecordProcessorFactory; + this(new SingleStreamTracker(streamName), + applicationName, + kinesisClient, + dynamoDBClient, + cloudWatchClient, + workerIdentifier, + shardRecordProcessorFactory); } /** - * Constructor to initialize ConfigsBuilder with MultiStreamTracker - * @param multiStreamTracker + * Constructor to initialize ConfigsBuilder + * + * @param streamTracker tracker for single- or multi-stream processing * @param applicationName * @param kinesisClient * @param dynamoDBClient @@ -147,17 +162,30 @@ public class ConfigsBuilder { * @param workerIdentifier * @param shardRecordProcessorFactory */ - public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, + public ConfigsBuilder(@NonNull StreamTracker streamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { - this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; + + // construct both streamTracker and appStreamTracker + streamTracker(streamTracker); + } + + public void appStreamTracker(Either appStreamTracker) { + this.appStreamTracker = appStreamTracker; + streamTracker = appStreamTracker.map(Function.identity(), SingleStreamTracker::new); + } + + public void streamTracker(StreamTracker streamTracker) { + this.streamTracker = streamTracker; + this.appStreamTracker = DeprecationUtils.convert(streamTracker, + singleStreamTracker -> singleStreamTracker.streamConfigList().get(0).streamIdentifier().streamName()); } /** @@ -205,7 +233,6 @@ public class ConfigsBuilder { return new MetricsConfig(cloudWatchClient(), namespace()); } - /** * Creates a new instance of ProcessorConfig * @@ -221,10 +248,6 @@ public class ConfigsBuilder { * @return RetrievalConfig */ public RetrievalConfig retrievalConfig() { - final RetrievalConfig retrievalConfig = - appStreamTracker.map( - multiStreamTracker -> new RetrievalConfig(kinesisClient(), multiStreamTracker, applicationName()), - streamName -> new RetrievalConfig(kinesisClient(), streamName, applicationName())); - return retrievalConfig; + return new RetrievalConfig(kinesisClient(), streamTracker(), applicationName()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DeprecationUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DeprecationUtils.java new file mode 100644 index 00000000..5d8782e0 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DeprecationUtils.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import java.util.function.Function; + +import software.amazon.awssdk.utils.Either; +import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.processor.StreamTracker; + +/** + * Utility methods to facilitate deprecated code until that deprecated code + * can be safely removed. + */ +public final class DeprecationUtils { + + private DeprecationUtils() { + throw new UnsupportedOperationException("utility class"); + } + + /** + * Converts a {@link StreamTracker} into the deprecated {@code Either} convention. + * + * @param streamTracker tracker to convert + */ + @Deprecated + public static Either convert( + StreamTracker streamTracker, + Function converter) { + if (streamTracker instanceof MultiStreamTracker) { + return Either.left((MultiStreamTracker) streamTracker); + } else if (streamTracker instanceof SingleStreamTracker) { + return Either.right(converter.apply((SingleStreamTracker) streamTracker)); + } else { + throw new IllegalArgumentException("Unhandled StreamTracker: " + streamTracker); + } + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 5c877b80..8bc4fc98 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -53,7 +53,6 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.HierarchicalShardSyncer; @@ -88,10 +87,10 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; -import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShutdownNotificationAware; +import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; @@ -138,7 +137,6 @@ public class Scheduler implements Runnable { private final ExecutorService executorService; private final DiagnosticEventFactory diagnosticEventFactory; private final DiagnosticEventHandler diagnosticEventHandler; - // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final Function shardSyncTaskManagerProvider; private final Map streamToShardSyncTaskManagerMap = new HashMap<>(); @@ -152,10 +150,9 @@ public class Scheduler implements Runnable { private final long failoverTimeMillis; private final long taskBackoffTimeMillis; private final boolean isMultiStreamMode; - private final Map currentStreamConfigMap; - private MultiStreamTracker multiStreamTracker; - private FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy; - private InitialPositionInStreamExtended orphanedStreamInitialPositionInStream; + private final Map currentStreamConfigMap = new ConcurrentHashMap<>(); + private final StreamTracker streamTracker; + private final FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; @@ -222,23 +219,12 @@ public class Scheduler implements Runnable { this.retrievalConfig = retrievalConfig; this.applicationName = this.coordinatorConfig.applicationName(); - this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map( - multiStreamTracker -> true, streamConfig -> false); - this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map( - multiStreamTracker -> { - this.multiStreamTracker = multiStreamTracker; - this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy(); - this.orphanedStreamInitialPositionInStream = multiStreamTracker.orphanedStreamInitialPositionInStream(); - return multiStreamTracker.streamConfigList().stream() - .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)); - }, - streamConfig -> { - // use a concrete, non-singleton map to allow computeIfAbsent(...) - // without forcing behavioral differences for multi-stream support - final Map map = new HashMap<>(); - map.put(streamConfig.streamIdentifier(), streamConfig); - return map; - }); + this.streamTracker = retrievalConfig.streamTracker(); + this.isMultiStreamMode = streamTracker.isMultiStream(); + this.formerStreamsLeasesDeletionStrategy = streamTracker.formerStreamsLeasesDeletionStrategy(); + streamTracker.streamConfigList().forEach( + sc -> currentStreamConfigMap.put(sc.streamIdentifier(), sc)); + this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); // Determine leaseSerializer based on availability of MultiStreamTracker. @@ -464,12 +450,8 @@ public class Scheduler implements Runnable { final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER); try { - - final Map newStreamConfigMap = new HashMap<>(); - final Duration waitPeriodToDeleteOldStreams = formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams(); - // Making an immutable copy - newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream() - .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc))); + final Map newStreamConfigMap = streamTracker.streamConfigList() + .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); List leases; @@ -549,6 +531,8 @@ public class Scheduler implements Runnable { } } + final Duration waitPeriodToDeleteOldStreams = + formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams(); // Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them. // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and // the streamIdentifiersForLeaseCleanup are not present in the latest snapshot. @@ -594,7 +578,7 @@ public class Scheduler implements Runnable { .collect(Collectors.toSet()); for (StreamIdentifier streamIdentifier : streamIdentifiers) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { - currentStreamConfigMap.put(streamIdentifier, getOrphanedStreamConfig(streamIdentifier)); + currentStreamConfigMap.put(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); } } } @@ -652,17 +636,6 @@ public class Scheduler implements Runnable { return true; } - /** - * Generates default StreamConfig for an "orphaned" stream that is in the lease table but not tracked. - * - * @param streamIdentifier stream for which an orphan config should be generated - */ - private StreamConfig getOrphanedStreamConfig(StreamIdentifier streamIdentifier) { - final StreamConfig orphanConfig = new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream); - log.info("Identified as orphan: {}", orphanConfig); - return orphanConfig; - } - /** * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. @@ -922,7 +895,10 @@ public class Scheduler implements Runnable { // Irrespective of single stream app or multi stream app, streamConfig should always be available. // If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config // to gracefully complete the reading. - final StreamConfig streamConfig = currentStreamConfigMap.computeIfAbsent(streamIdentifier, this::getOrphanedStreamConfig); + StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); + if (streamConfig == null) { + streamConfig = streamTracker.createStreamConfig(streamIdentifier); + } Validate.notNull(streamConfig, "StreamConfig should not be null"); RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java index 7e878e2c..ead38333 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java @@ -15,43 +15,14 @@ package software.amazon.kinesis.processor; -import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.common.StreamConfig; - -import java.util.List; - /** - * Interface for stream trackers. This is useful for KCL Workers that need - * to consume data from multiple streams. - * KCL will periodically probe this interface to learn about the new and old streams. + * Tracker for consuming multiple Kinesis streams. */ -public interface MultiStreamTracker { +public interface MultiStreamTracker extends StreamTracker { - /** - * Returns the list of stream config, to be processed by the current application. - * Note that the streams list CAN be changed during the application runtime. - * This method will be called periodically by the KCL to learn about the change in streams to process. - * - * @return List of StreamConfig - */ - List streamConfigList(); - - /** - * Strategy to delete leases of old streams in the lease table. - * Note that the strategy CANNOT be changed during the application runtime. - * - * @return StreamsLeasesDeletionStrategy - */ - FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy(); - - /** - * The position for getting records from an "orphaned" stream that is in the lease table but not tracked - * Default assumes that the stream no longer need to be tracked, so use LATEST for faster shard end. - * - *

Default value: {@link InitialPositionInStream#LATEST}

- */ - default InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() { - return InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); + @Override + default boolean isMultiStream() { + return true; } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java new file mode 100644 index 00000000..703c4881 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/SingleStreamTracker.java @@ -0,0 +1,81 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.processor; + +import java.util.Collections; +import java.util.List; + +import lombok.EqualsAndHashCode; +import lombok.NonNull; +import lombok.ToString; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; + +/** + * Tracker for consuming a single Kinesis stream. + */ +@EqualsAndHashCode +@ToString +public class SingleStreamTracker implements StreamTracker { + + /** + * By default, single-stream applications should expect the target stream + * to exist for the duration of the application. Therefore, there is no + * expectation for the leases to be deleted mid-execution. + */ + private static final FormerStreamsLeasesDeletionStrategy NO_LEASE_DELETION = + new FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy(); + + private final StreamIdentifier streamIdentifier; + + private final List streamConfigs; + + public SingleStreamTracker(String streamName) { + this(StreamIdentifier.singleStreamInstance(streamName)); + } + + public SingleStreamTracker(StreamIdentifier streamIdentifier) { + this(streamIdentifier, DEFAULT_POSITION_IN_STREAM); + } + + public SingleStreamTracker( + StreamIdentifier streamIdentifier, + @NonNull InitialPositionInStreamExtended initialPosition) { + this(streamIdentifier, new StreamConfig(streamIdentifier, initialPosition)); + } + + public SingleStreamTracker(@NonNull StreamIdentifier streamIdentifier, @NonNull StreamConfig streamConfig) { + this.streamIdentifier = streamIdentifier; + this.streamConfigs = Collections.singletonList(streamConfig); + } + + @Override + public List streamConfigList() { + return streamConfigs; + } + + @Override + public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() { + return NO_LEASE_DELETION; + } + + @Override + public boolean isMultiStream() { + return false; + } + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java new file mode 100644 index 00000000..befa3709 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.processor; + +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; + +import java.util.List; + +/** + * Interface for stream trackers. + * KCL will periodically probe this interface to learn about the new and old streams. + */ +public interface StreamTracker { + + /** + * Default position to begin consuming records from a Kinesis stream. + * + * @see #orphanedStreamInitialPositionInStream() + */ + InitialPositionInStreamExtended DEFAULT_POSITION_IN_STREAM = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); + + /** + * Returns the list of stream config, to be processed by the current application. + * Note that the streams list CAN be changed during the application runtime. + * This method will be called periodically by the KCL to learn about the change in streams to process. + * + * @return List of StreamConfig + */ + List streamConfigList(); + + /** + * Strategy to delete leases of old streams in the lease table. + * Note that the strategy CANNOT be changed during the application runtime. + * + * @return StreamsLeasesDeletionStrategy + */ + FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy(); + + /** + * The position for getting records from an "orphaned" stream that is in the lease table but not tracked + * Default assumes that the stream no longer need to be tracked, so use LATEST for faster shard end. + * + *

Default value: {@link InitialPositionInStream#LATEST}

+ */ + default InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() { + return DEFAULT_POSITION_IN_STREAM; + } + + /** + * Returns a new {@link StreamConfig} for the provided stream identifier. + * + * @param streamIdentifier stream for which to create a new config + */ + default StreamConfig createStreamConfig(StreamIdentifier streamIdentifier) { + return new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream()); + } + + /** + * Returns true if this application should accommodate the consumption of + * more than one Kinesis stream. + *

+ * This method must be consistent. Varying the returned value will + * have indeterminate, and likely problematic, effects on stream processing. + *

+ */ + boolean isMultiStream(); + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index abb85612..000b71b7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -24,12 +24,14 @@ import lombok.ToString; import lombok.experimental.Accessors; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.utils.Either; +import software.amazon.kinesis.common.DeprecationUtils; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.common.KinesisClientLibraryPackage; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig; @@ -67,9 +69,17 @@ public class RetrievalConfig { /** * AppStreamTracker either for multi stream tracking or single stream + * + * @deprecated Both single- and multi-stream support is now provided by {@link StreamTracker}. + * @see #streamTracker */ private Either appStreamTracker; + /** + * Stream(s) to be consumed by this KCL application. + */ + private StreamTracker streamTracker; + /** * Backoff time between consecutive ListShards calls. * @@ -95,7 +105,12 @@ public class RetrievalConfig { *

* Default value: {@link InitialPositionInStream#LATEST} *

+ * + * @deprecated Initial stream position is now handled by {@link StreamTracker}. + * @see StreamTracker#orphanedStreamInitialPositionInStream() + * @see StreamTracker#createConfig(StreamIdentifier) */ + @Deprecated private InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended .newInitialPosition(InitialPositionInStream.LATEST); @@ -105,27 +120,36 @@ public class RetrievalConfig { public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName, @NonNull String applicationName) { - this.kinesisClient = kinesisAsyncClient; - this.appStreamTracker = Either - .right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended)); - this.applicationName = applicationName; + this(kinesisAsyncClient, new SingleStreamTracker(streamName), applicationName); } - public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker, + public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull StreamTracker streamTracker, @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; - this.appStreamTracker = Either.left(multiStreamTracker); + this.streamTracker = streamTracker; this.applicationName = applicationName; + this.appStreamTracker = DeprecationUtils.convert(streamTracker, + singleStreamTracker -> singleStreamTracker.streamConfigList().get(0)); } + /** + * + * @param initialPositionInStreamExtended + * + * @deprecated Initial stream position is now handled by {@link StreamTracker}. + * @see StreamTracker#orphanedStreamInitialPositionInStream() + * @see StreamTracker#createConfig(StreamIdentifier) + */ + @Deprecated public RetrievalConfig initialPositionInStreamExtended(InitialPositionInStreamExtended initialPositionInStreamExtended) { - final StreamConfig[] streamConfig = new StreamConfig[1]; this.appStreamTracker.apply(multiStreamTracker -> { throw new IllegalArgumentException( "Cannot set initialPositionInStreamExtended when multiStreamTracker is set"); - }, sc -> streamConfig[0] = sc); - this.appStreamTracker = Either - .right(new StreamConfig(streamConfig[0].streamIdentifier(), initialPositionInStreamExtended)); + }, sc -> { + final StreamConfig updatedConfig = new StreamConfig(sc.streamIdentifier(), initialPositionInStreamExtended); + streamTracker = new SingleStreamTracker(sc.streamIdentifier(), updatedConfig); + appStreamTracker = Either.right(updatedConfig); + }); return this; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java new file mode 100644 index 00000000..8ea8f818 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/ConfigsBuilderTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import java.util.Arrays; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.processor.StreamTracker; + +public class ConfigsBuilderTest { + + @Mock + private KinesisAsyncClient mockKinesisClient; + + @Mock + private DynamoDbAsyncClient mockDynamoClient; + + @Mock + private CloudWatchAsyncClient mockCloudWatchClient; + + @Mock + private ShardRecordProcessorFactory mockShardProcessorFactory; + + private static final String APPLICATION_NAME = ConfigsBuilderTest.class.getSimpleName(); + private static final String WORKER_IDENTIFIER = "worker-id"; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testTrackerConstruction() { + final String streamName = "single-stream"; + final ConfigsBuilder configByName = createConfig(streamName); + final ConfigsBuilder configBySingleTracker = createConfig(new SingleStreamTracker(streamName)); + + for (final ConfigsBuilder cb : Arrays.asList(configByName, configBySingleTracker)) { + assertEquals(Optional.empty(), cb.appStreamTracker().left()); + assertEquals(streamName, cb.appStreamTracker().right().get()); + assertEquals(streamName, cb.streamTracker().streamConfigList().get(0).streamIdentifier().streamName()); + assertFalse(cb.streamTracker().isMultiStream()); + } + + final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class); + final ConfigsBuilder configByMultiTracker = createConfig(mockMultiStreamTracker); + assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right()); + assertEquals(mockMultiStreamTracker, configByMultiTracker.appStreamTracker().left().get()); + assertEquals(mockMultiStreamTracker, configByMultiTracker.streamTracker()); + } + + private ConfigsBuilder createConfig(String streamName) { + return new ConfigsBuilder(streamName, APPLICATION_NAME, mockKinesisClient, mockDynamoClient, + mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory); + } + + private ConfigsBuilder createConfig(StreamTracker streamTracker) { + return new ConfigsBuilder(streamTracker, APPLICATION_NAME, mockKinesisClient, mockDynamoClient, + mockCloudWatchClient, WORKER_IDENTIFIER, mockShardProcessorFactory); + } + +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/DeprecationUtilsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/DeprecationUtilsTest.java new file mode 100644 index 00000000..39991b78 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/DeprecationUtilsTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +import org.junit.Test; +import software.amazon.awssdk.utils.Either; +import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.processor.StreamTracker; + +public class DeprecationUtilsTest { + + @Test + public void testTrackerConversion() { + final StreamTracker mockMultiTracker = mock(MultiStreamTracker.class); + assertEquals(Either.left(mockMultiTracker), DeprecationUtils.convert(mockMultiTracker, Function.identity())); + + final StreamTracker mockSingleTracker = mock(SingleStreamTracker.class); + assertEquals(Either.right(mockSingleTracker), DeprecationUtils.convert(mockSingleTracker, Function.identity())); + } + + @Test(expected = IllegalArgumentException.class) + public void testUnsupportedStreamTrackerConversion() { + DeprecationUtils.convert(mock(StreamTracker.class), Function.identity()); + } + +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java new file mode 100644 index 00000000..9ae19ba3 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/processor/SingleStreamTrackerTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.processor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +import org.hamcrest.Matchers; +import org.junit.Test; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; + +public class SingleStreamTrackerTest { + + private static final String STREAM_NAME = SingleStreamTrackerTest.class.getSimpleName(); + + @Test + public void testDefaults() { + validate(new SingleStreamTracker(STREAM_NAME)); + validate(new SingleStreamTracker(StreamIdentifier.singleStreamInstance(STREAM_NAME))); + } + + @Test + public void testInitialPositionConstructor() { + final InitialPositionInStreamExtended expectedPosition = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + assertNotEquals(expectedPosition, StreamTracker.DEFAULT_POSITION_IN_STREAM); + + final StreamTracker tracker = new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(STREAM_NAME), expectedPosition); + validate(tracker, expectedPosition); + } + + private static void validate(StreamTracker tracker) { + validate(tracker, StreamTracker.DEFAULT_POSITION_IN_STREAM); + } + + private static void validate(StreamTracker tracker, InitialPositionInStreamExtended expectedPosition) { + assertEquals(1, tracker.streamConfigList().size()); + assertFalse(tracker.isMultiStream()); + assertThat(tracker.formerStreamsLeasesDeletionStrategy(), + Matchers.instanceOf(FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy.class)); + + final StreamConfig config = tracker.streamConfigList().get(0); + assertEquals(STREAM_NAME, config.streamIdentifier().streamName()); + assertEquals(expectedPosition, config.initialPositionInStreamExtended()); + } + +} \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/RetrievalConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/RetrievalConfigTest.java new file mode 100644 index 00000000..041ac71e --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/RetrievalConfigTest.java @@ -0,0 +1,85 @@ +package software.amazon.kinesis.retrieval; + +import java.util.Arrays; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static software.amazon.kinesis.common.InitialPositionInStream.LATEST; +import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.processor.StreamTracker; + +public class RetrievalConfigTest { + + private static final String APPLICATION_NAME = RetrievalConfigTest.class.getSimpleName(); + + @Mock + private KinesisAsyncClient mockKinesisClient; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testTrackerConstruction() { + final String streamName = "single-stream"; + final RetrievalConfig configByName = createConfig(streamName); + final SingleStreamTracker singleTracker = new SingleStreamTracker(streamName); + final RetrievalConfig configBySingleTracker = createConfig(singleTracker); + + for (final RetrievalConfig rc : Arrays.asList(configByName, configBySingleTracker)) { + assertEquals(Optional.empty(), rc.appStreamTracker().left()); + assertEquals(singleTracker, rc.streamTracker()); + assertEquals(1, rc.streamTracker().streamConfigList().size()); + assertFalse(rc.streamTracker().isMultiStream()); + } + + final StreamTracker mockMultiStreamTracker = mock(MultiStreamTracker.class); + final RetrievalConfig configByMultiTracker = createConfig(mockMultiStreamTracker); + assertEquals(Optional.empty(), configByMultiTracker.appStreamTracker().right()); + assertEquals(mockMultiStreamTracker, configByMultiTracker.appStreamTracker().left().get()); + assertEquals(mockMultiStreamTracker, configByMultiTracker.streamTracker()); + } + + @Test + public void testUpdateInitialPositionInSingleStream() { + final RetrievalConfig config = createConfig(new SingleStreamTracker("foo")); + + for (final StreamConfig sc : config.streamTracker().streamConfigList()) { + assertEquals(LATEST, sc.initialPositionInStreamExtended().getInitialPositionInStream()); + } + config.initialPositionInStreamExtended( + InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON)); + for (final StreamConfig sc : config.streamTracker().streamConfigList()) { + assertEquals(TRIM_HORIZON, sc.initialPositionInStreamExtended().getInitialPositionInStream()); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testUpdateInitialPositionInMultiStream() { + final RetrievalConfig config = createConfig(mock(MultiStreamTracker.class)); + config.initialPositionInStreamExtended( + InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON)); + } + + private RetrievalConfig createConfig(String streamName) { + return new RetrievalConfig(mockKinesisClient, streamName, APPLICATION_NAME); + } + + private RetrievalConfig createConfig(StreamTracker streamTracker) { + return new RetrievalConfig(mockKinesisClient, streamTracker, APPLICATION_NAME); + } + +} \ No newline at end of file