Update RetrievalFactory implementations to utilize the StreamIdentifier field of StreamConfig (#1291)
This commit is contained in:
parent
969341130a
commit
e9990190cc
5 changed files with 46 additions and 145 deletions
|
|
@ -23,11 +23,34 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
|||
*
|
||||
*/
|
||||
public interface RetrievalFactory {
|
||||
GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory);
|
||||
|
||||
/**
|
||||
* @deprecated This method was only used by specific implementations of {@link RetrievalFactory} and should not be
|
||||
* required to be implemented; will be removed in future versions.
|
||||
*/
|
||||
@Deprecated
|
||||
RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory);
|
||||
default GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(
|
||||
ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
||||
throw new UnsupportedOperationException("This method is deprecated and should not be used.");
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated This method is deprecated and will be removed in future versions.
|
||||
* Please use {@link #createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory)}.
|
||||
*/
|
||||
@Deprecated
|
||||
default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
||||
throw new UnsupportedOperationException("This method is deprecated and should not be used.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link RecordsPublisher} instance to retrieve records for the specified shard.
|
||||
*
|
||||
* @param shardInfo The {@link ShardInfo} representing the shard for which records are to be retrieved.
|
||||
* @param streamConfig The {@link StreamConfig} containing details for the stream.
|
||||
* @param metricsFactory The {@link MetricsFactory} for recording metrics.
|
||||
* @return A {@link RecordsPublisher} instance for retrieving records from the shard.
|
||||
*/
|
||||
default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, StreamConfig streamConfig, MetricsFactory metricsFactory) {
|
||||
return createGetRecordsCache(shardInfo, metricsFactory);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,10 +23,10 @@ import software.amazon.kinesis.common.StreamConfig;
|
|||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
||||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
|
@ -41,36 +41,23 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
|
|||
private final String defaultConsumerArn;
|
||||
private final Function<String, String> consumerArnCreator;
|
||||
|
||||
private Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
|
||||
final MetricsFactory metricsFactory) {
|
||||
return null;
|
||||
}
|
||||
private final Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
||||
final StreamConfig streamConfig,
|
||||
final MetricsFactory metricsFactory) {
|
||||
@NonNull final StreamConfig streamConfig,
|
||||
@Nullable final MetricsFactory metricsFactory) {
|
||||
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
|
||||
if (streamIdentifierStr.isPresent()) {
|
||||
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
|
||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
||||
getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),
|
||||
getOrCreateConsumerArn(streamConfig.streamIdentifier(), streamConfig.consumerArn()),
|
||||
streamIdentifierStr.get());
|
||||
} else {
|
||||
final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName);
|
||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
||||
getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn));
|
||||
getOrCreateConsumerArn(streamConfig.streamIdentifier(), defaultConsumerArn));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
||||
throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info");
|
||||
}
|
||||
|
||||
private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) {
|
||||
return consumerArn != null ? consumerArn : implicitConsumerArnTracker
|
||||
.computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName()));
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import lombok.Data;
|
|||
import lombok.NonNull;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.common.StreamConfig;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
|
|
@ -50,20 +51,6 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
|
||||
private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
|
||||
|
||||
@Deprecated
|
||||
public SynchronousBlockingRetrievalFactory(String streamName,
|
||||
KinesisAsyncClient kinesisClient,
|
||||
RecordsFetcherFactory recordsFetcherFactory,
|
||||
int maxRecords,
|
||||
Duration kinesisRequestTimeout) {
|
||||
this(streamName,
|
||||
kinesisClient,
|
||||
recordsFetcherFactory,
|
||||
maxRecords,
|
||||
kinesisRequestTimeout,
|
||||
defaultDataFetcherProvider(kinesisClient));
|
||||
}
|
||||
|
||||
public SynchronousBlockingRetrievalFactory(String streamName,
|
||||
KinesisAsyncClient kinesisClient,
|
||||
RecordsFetcherFactory recordsFetcherFactory,
|
||||
|
|
@ -79,26 +66,14 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public SynchronousBlockingRetrievalFactory(String streamName,
|
||||
KinesisAsyncClient kinesisClient,
|
||||
RecordsFetcherFactory recordsFetcherFactory,
|
||||
int maxRecords) {
|
||||
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
|
||||
}
|
||||
|
||||
private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
|
||||
KinesisAsyncClient kinesisClient) {
|
||||
return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final MetricsFactory metricsFactory) {
|
||||
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
|
||||
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
|
||||
StreamIdentifier.singleStreamInstance(streamName);
|
||||
|
||||
private GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final StreamIdentifier streamIdentifier,
|
||||
@NonNull final MetricsFactory metricsFactory) {
|
||||
final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(
|
||||
streamIdentifier,
|
||||
shardInfo.shardId(),
|
||||
|
|
@ -113,8 +88,12 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
|
||||
@Override
|
||||
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final StreamConfig streamConfig,
|
||||
@NonNull final MetricsFactory metricsFactory) {
|
||||
return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo, metricsFactory),
|
||||
shardInfo.shardId(), metricsFactory, maxRecords);
|
||||
return recordsFetcherFactory.createRecordsFetcher(
|
||||
createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory),
|
||||
shardInfo.shardId(),
|
||||
metricsFactory,
|
||||
maxRecords);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,93 +0,0 @@
|
|||
/*
|
||||
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.retrieval.polling;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import lombok.NonNull;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
||||
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
|
||||
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
|
||||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@KinesisClientInternalApi
|
||||
public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory {
|
||||
@NonNull
|
||||
private final String streamName;
|
||||
@NonNull
|
||||
private final KinesisAsyncClient kinesisClient;
|
||||
@NonNull
|
||||
private final RecordsFetcherFactory recordsFetcherFactory;
|
||||
private final int maxRecords;
|
||||
@NonNull
|
||||
private final ExecutorService executorService;
|
||||
private final long idleMillisBetweenCalls;
|
||||
private final Duration maxFutureWait;
|
||||
|
||||
@Deprecated
|
||||
public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
|
||||
RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
|
||||
long idleMillisBetweenCalls) {
|
||||
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls,
|
||||
PollingConfig.DEFAULT_REQUEST_TIMEOUT);
|
||||
}
|
||||
|
||||
public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
|
||||
RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
|
||||
long idleMillisBetweenCalls, Duration maxFutureWait) {
|
||||
this.streamName = streamName;
|
||||
this.kinesisClient = kinesisClient;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
this.maxRecords = maxRecords;
|
||||
this.executorService = executorService;
|
||||
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
|
||||
this.maxFutureWait = maxFutureWait;
|
||||
}
|
||||
|
||||
@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final MetricsFactory metricsFactory) {
|
||||
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
|
||||
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
|
||||
StreamIdentifier.singleStreamInstance(streamName);
|
||||
|
||||
return new SynchronousGetRecordsRetrievalStrategy(
|
||||
new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig(
|
||||
streamIdentifier,
|
||||
shardInfo.shardId(),
|
||||
metricsFactory,
|
||||
maxRecords,
|
||||
maxFutureWait
|
||||
)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final MetricsFactory metricsFactory) {
|
||||
return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(),
|
||||
recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords,
|
||||
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls,
|
||||
metricsFactory, "Prefetching", shardInfo.shardId());
|
||||
}
|
||||
}
|
||||
|
|
@ -37,6 +37,7 @@ import org.mockito.runners.MockitoJUnitRunner;
|
|||
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.common.StreamConfig;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
|
|
@ -58,6 +59,8 @@ public class FanOutConfigTest {
|
|||
private KinesisAsyncClient kinesisClient;
|
||||
@Mock
|
||||
private StreamConfig streamConfig;
|
||||
@Mock
|
||||
private StreamIdentifier streamIdentifier;
|
||||
|
||||
private FanOutConfig config;
|
||||
|
||||
|
|
@ -69,6 +72,8 @@ public class FanOutConfigTest {
|
|||
.streamName(TEST_STREAM_NAME);
|
||||
doReturn(consumerRegistration).when(config)
|
||||
.createConsumerRegistration(eq(kinesisClient), anyString(), anyString());
|
||||
when(streamConfig.streamIdentifier()).thenReturn(streamIdentifier);
|
||||
when(streamIdentifier.streamName()).thenReturn(TEST_STREAM_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in a new issue