diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java index 5703e1af..04727294 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java @@ -23,11 +23,34 @@ import software.amazon.kinesis.metrics.MetricsFactory; * */ public interface RetrievalFactory { - GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory); + /** + * @deprecated This method was only used by specific implementations of {@link RetrievalFactory} and should not be + * required to be implemented; will be removed in future versions. + */ @Deprecated - RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory); + default GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy( + ShardInfo shardInfo, MetricsFactory metricsFactory) { + throw new UnsupportedOperationException("This method is deprecated and should not be used."); + } + /** + * @deprecated This method is deprecated and will be removed in future versions. + * Please use {@link #createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory)}. + */ + @Deprecated + default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) { + throw new UnsupportedOperationException("This method is deprecated and should not be used."); + } + + /** + * Creates a {@link RecordsPublisher} instance to retrieve records for the specified shard. + * + * @param shardInfo The {@link ShardInfo} representing the shard for which records are to be retrieved. + * @param streamConfig The {@link StreamConfig} containing details for the stream. + * @param metricsFactory The {@link MetricsFactory} for recording metrics. + * @return A {@link RecordsPublisher} instance for retrieving records from the shard. + */ default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, StreamConfig streamConfig, MetricsFactory metricsFactory) { return createGetRecordsCache(shardInfo, metricsFactory); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index bcfb1081..a83d0370 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -23,10 +23,10 @@ import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -41,36 +41,23 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final String defaultConsumerArn; private final Function consumerArnCreator; - private Map implicitConsumerArnTracker = new HashMap<>(); - - @Override - public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, - final MetricsFactory metricsFactory) { - return null; - } + private final Map implicitConsumerArnTracker = new HashMap<>(); @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, - final StreamConfig streamConfig, - final MetricsFactory metricsFactory) { + @NonNull final StreamConfig streamConfig, + @Nullable final MetricsFactory metricsFactory) { final Optional streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); if (streamIdentifierStr.isPresent()) { - final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()), + getOrCreateConsumerArn(streamConfig.streamIdentifier(), streamConfig.consumerArn()), streamIdentifierStr.get()); } else { - final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn)); + getOrCreateConsumerArn(streamConfig.streamIdentifier(), defaultConsumerArn)); } } - @Override - public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) { - throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info"); - } - private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) { return consumerArn != null ? consumerArn : implicitConsumerArnTracker .computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName())); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 071763fc..9b3190d5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -21,6 +21,7 @@ import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; @@ -50,20 +51,6 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { private final Function dataFetcherProvider; - @Deprecated - public SynchronousBlockingRetrievalFactory(String streamName, - KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, - int maxRecords, - Duration kinesisRequestTimeout) { - this(streamName, - kinesisClient, - recordsFetcherFactory, - maxRecords, - kinesisRequestTimeout, - defaultDataFetcherProvider(kinesisClient)); - } - public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, @@ -79,26 +66,14 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider; } - @Deprecated - public SynchronousBlockingRetrievalFactory(String streamName, - KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, - int maxRecords) { - this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT); - } - private static Function defaultDataFetcherProvider( KinesisAsyncClient kinesisClient) { return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig); } - @Override - public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { - final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? - StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : - StreamIdentifier.singleStreamInstance(streamName); - + private GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, + @NonNull final StreamIdentifier streamIdentifier, + @NonNull final MetricsFactory metricsFactory) { final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig( streamIdentifier, shardInfo.shardId(), @@ -113,8 +88,12 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, + @NonNull final StreamConfig streamConfig, @NonNull final MetricsFactory metricsFactory) { - return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), - shardInfo.shardId(), metricsFactory, maxRecords); + return recordsFetcherFactory.createRecordsFetcher( + createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory), + shardInfo.shardId(), + metricsFactory, + maxRecords); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java deleted file mode 100644 index efa11e70..00000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package software.amazon.kinesis.retrieval.polling; - -import java.time.Duration; -import java.util.concurrent.ExecutorService; -import lombok.NonNull; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.common.StreamIdentifier; -import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; -import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; -import software.amazon.kinesis.retrieval.RecordsFetcherFactory; -import software.amazon.kinesis.retrieval.RecordsPublisher; -import software.amazon.kinesis.retrieval.RetrievalFactory; - -/** - * - */ -@KinesisClientInternalApi -public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory { - @NonNull - private final String streamName; - @NonNull - private final KinesisAsyncClient kinesisClient; - @NonNull - private final RecordsFetcherFactory recordsFetcherFactory; - private final int maxRecords; - @NonNull - private final ExecutorService executorService; - private final long idleMillisBetweenCalls; - private final Duration maxFutureWait; - - @Deprecated - public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, - long idleMillisBetweenCalls) { - this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls, - PollingConfig.DEFAULT_REQUEST_TIMEOUT); - } - - public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, - long idleMillisBetweenCalls, Duration maxFutureWait) { - this.streamName = streamName; - this.kinesisClient = kinesisClient; - this.recordsFetcherFactory = recordsFetcherFactory; - this.maxRecords = maxRecords; - this.executorService = executorService; - this.idleMillisBetweenCalls = idleMillisBetweenCalls; - this.maxFutureWait = maxFutureWait; - } - - @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { - final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? - StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : - StreamIdentifier.singleStreamInstance(streamName); - - return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig( - streamIdentifier, - shardInfo.shardId(), - metricsFactory, - maxRecords, - maxFutureWait - ))); - } - - @Override - public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { - return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(), - recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords, - createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls, - metricsFactory, "Prefetching", shardInfo.shardId()); - } -} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java index 58454087..c5727e20 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java @@ -37,6 +37,7 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.metrics.MetricsFactory; @@ -58,6 +59,8 @@ public class FanOutConfigTest { private KinesisAsyncClient kinesisClient; @Mock private StreamConfig streamConfig; + @Mock + private StreamIdentifier streamIdentifier; private FanOutConfig config; @@ -69,6 +72,8 @@ public class FanOutConfigTest { .streamName(TEST_STREAM_NAME); doReturn(consumerRegistration).when(config) .createConsumerRegistration(eq(kinesisClient), anyString(), anyString()); + when(streamConfig.streamIdentifier()).thenReturn(streamIdentifier); + when(streamIdentifier.streamName()).thenReturn(TEST_STREAM_NAME); } @Test