Update RetrievalFactory implementations to utilize the StreamIdentifier field of StreamConfig (#1291)
This commit is contained in:
parent
668d59a4b0
commit
443ffc4a7d
5 changed files with 46 additions and 145 deletions
|
|
@ -23,11 +23,34 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface RetrievalFactory {
|
public interface RetrievalFactory {
|
||||||
GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory);
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated This method was only used by specific implementations of {@link RetrievalFactory} and should not be
|
||||||
|
* required to be implemented; will be removed in future versions.
|
||||||
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory);
|
default GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(
|
||||||
|
ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
||||||
|
throw new UnsupportedOperationException("This method is deprecated and should not be used.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated This method is deprecated and will be removed in future versions.
|
||||||
|
* Please use {@link #createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory)}.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
||||||
|
throw new UnsupportedOperationException("This method is deprecated and should not be used.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a {@link RecordsPublisher} instance to retrieve records for the specified shard.
|
||||||
|
*
|
||||||
|
* @param shardInfo The {@link ShardInfo} representing the shard for which records are to be retrieved.
|
||||||
|
* @param streamConfig The {@link StreamConfig} containing details for the stream.
|
||||||
|
* @param metricsFactory The {@link MetricsFactory} for recording metrics.
|
||||||
|
* @return A {@link RecordsPublisher} instance for retrieving records from the shard.
|
||||||
|
*/
|
||||||
default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, StreamConfig streamConfig, MetricsFactory metricsFactory) {
|
default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, StreamConfig streamConfig, MetricsFactory metricsFactory) {
|
||||||
return createGetRecordsCache(shardInfo, metricsFactory);
|
return createGetRecordsCache(shardInfo, metricsFactory);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,10 +23,10 @@ import software.amazon.kinesis.common.StreamConfig;
|
||||||
import software.amazon.kinesis.common.StreamIdentifier;
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
|
||||||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
@ -41,36 +41,23 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
|
||||||
private final String defaultConsumerArn;
|
private final String defaultConsumerArn;
|
||||||
private final Function<String, String> consumerArnCreator;
|
private final Function<String, String> consumerArnCreator;
|
||||||
|
|
||||||
private Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
|
private final Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
|
|
||||||
final MetricsFactory metricsFactory) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
||||||
final StreamConfig streamConfig,
|
@NonNull final StreamConfig streamConfig,
|
||||||
final MetricsFactory metricsFactory) {
|
@Nullable final MetricsFactory metricsFactory) {
|
||||||
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
|
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
|
||||||
if (streamIdentifierStr.isPresent()) {
|
if (streamIdentifierStr.isPresent()) {
|
||||||
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
|
|
||||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
||||||
getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),
|
getOrCreateConsumerArn(streamConfig.streamIdentifier(), streamConfig.consumerArn()),
|
||||||
streamIdentifierStr.get());
|
streamIdentifierStr.get());
|
||||||
} else {
|
} else {
|
||||||
final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName);
|
|
||||||
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
|
||||||
getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn));
|
getOrCreateConsumerArn(streamConfig.streamIdentifier(), defaultConsumerArn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
|
|
||||||
throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info");
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) {
|
private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) {
|
||||||
return consumerArn != null ? consumerArn : implicitConsumerArnTracker
|
return consumerArn != null ? consumerArn : implicitConsumerArnTracker
|
||||||
.computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName()));
|
.computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName()));
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import lombok.Data;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
|
import software.amazon.kinesis.common.StreamConfig;
|
||||||
import software.amazon.kinesis.common.StreamIdentifier;
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
|
|
@ -50,20 +51,6 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
||||||
|
|
||||||
private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
|
private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public SynchronousBlockingRetrievalFactory(String streamName,
|
|
||||||
KinesisAsyncClient kinesisClient,
|
|
||||||
RecordsFetcherFactory recordsFetcherFactory,
|
|
||||||
int maxRecords,
|
|
||||||
Duration kinesisRequestTimeout) {
|
|
||||||
this(streamName,
|
|
||||||
kinesisClient,
|
|
||||||
recordsFetcherFactory,
|
|
||||||
maxRecords,
|
|
||||||
kinesisRequestTimeout,
|
|
||||||
defaultDataFetcherProvider(kinesisClient));
|
|
||||||
}
|
|
||||||
|
|
||||||
public SynchronousBlockingRetrievalFactory(String streamName,
|
public SynchronousBlockingRetrievalFactory(String streamName,
|
||||||
KinesisAsyncClient kinesisClient,
|
KinesisAsyncClient kinesisClient,
|
||||||
RecordsFetcherFactory recordsFetcherFactory,
|
RecordsFetcherFactory recordsFetcherFactory,
|
||||||
|
|
@ -79,26 +66,14 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
||||||
defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
|
defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public SynchronousBlockingRetrievalFactory(String streamName,
|
|
||||||
KinesisAsyncClient kinesisClient,
|
|
||||||
RecordsFetcherFactory recordsFetcherFactory,
|
|
||||||
int maxRecords) {
|
|
||||||
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
|
private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
|
||||||
KinesisAsyncClient kinesisClient) {
|
KinesisAsyncClient kinesisClient) {
|
||||||
return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig);
|
return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
||||||
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
@NonNull final StreamIdentifier streamIdentifier,
|
||||||
@NonNull final MetricsFactory metricsFactory) {
|
@NonNull final MetricsFactory metricsFactory) {
|
||||||
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
|
|
||||||
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
|
|
||||||
StreamIdentifier.singleStreamInstance(streamName);
|
|
||||||
|
|
||||||
final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(
|
final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(
|
||||||
streamIdentifier,
|
streamIdentifier,
|
||||||
shardInfo.shardId(),
|
shardInfo.shardId(),
|
||||||
|
|
@ -113,8 +88,12 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
||||||
|
@NonNull final StreamConfig streamConfig,
|
||||||
@NonNull final MetricsFactory metricsFactory) {
|
@NonNull final MetricsFactory metricsFactory) {
|
||||||
return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo, metricsFactory),
|
return recordsFetcherFactory.createRecordsFetcher(
|
||||||
shardInfo.shardId(), metricsFactory, maxRecords);
|
createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory),
|
||||||
|
shardInfo.shardId(),
|
||||||
|
metricsFactory,
|
||||||
|
maxRecords);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,93 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package software.amazon.kinesis.retrieval.polling;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import lombok.NonNull;
|
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
|
||||||
import software.amazon.kinesis.common.StreamIdentifier;
|
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
|
||||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
|
||||||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
|
||||||
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
|
|
||||||
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
|
|
||||||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
|
||||||
import software.amazon.kinesis.retrieval.RetrievalFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@KinesisClientInternalApi
|
|
||||||
public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory {
|
|
||||||
@NonNull
|
|
||||||
private final String streamName;
|
|
||||||
@NonNull
|
|
||||||
private final KinesisAsyncClient kinesisClient;
|
|
||||||
@NonNull
|
|
||||||
private final RecordsFetcherFactory recordsFetcherFactory;
|
|
||||||
private final int maxRecords;
|
|
||||||
@NonNull
|
|
||||||
private final ExecutorService executorService;
|
|
||||||
private final long idleMillisBetweenCalls;
|
|
||||||
private final Duration maxFutureWait;
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
|
|
||||||
RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
|
|
||||||
long idleMillisBetweenCalls) {
|
|
||||||
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls,
|
|
||||||
PollingConfig.DEFAULT_REQUEST_TIMEOUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
|
|
||||||
RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
|
|
||||||
long idleMillisBetweenCalls, Duration maxFutureWait) {
|
|
||||||
this.streamName = streamName;
|
|
||||||
this.kinesisClient = kinesisClient;
|
|
||||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
|
||||||
this.maxRecords = maxRecords;
|
|
||||||
this.executorService = executorService;
|
|
||||||
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
|
|
||||||
this.maxFutureWait = maxFutureWait;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
|
|
||||||
@NonNull final MetricsFactory metricsFactory) {
|
|
||||||
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
|
|
||||||
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
|
|
||||||
StreamIdentifier.singleStreamInstance(streamName);
|
|
||||||
|
|
||||||
return new SynchronousGetRecordsRetrievalStrategy(
|
|
||||||
new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig(
|
|
||||||
streamIdentifier,
|
|
||||||
shardInfo.shardId(),
|
|
||||||
metricsFactory,
|
|
||||||
maxRecords,
|
|
||||||
maxFutureWait
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
|
|
||||||
@NonNull final MetricsFactory metricsFactory) {
|
|
||||||
return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(),
|
|
||||||
recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords,
|
|
||||||
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls,
|
|
||||||
metricsFactory, "Prefetching", shardInfo.shardId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -37,6 +37,7 @@ import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.kinesis.common.StreamConfig;
|
import software.amazon.kinesis.common.StreamConfig;
|
||||||
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
|
|
@ -58,6 +59,8 @@ public class FanOutConfigTest {
|
||||||
private KinesisAsyncClient kinesisClient;
|
private KinesisAsyncClient kinesisClient;
|
||||||
@Mock
|
@Mock
|
||||||
private StreamConfig streamConfig;
|
private StreamConfig streamConfig;
|
||||||
|
@Mock
|
||||||
|
private StreamIdentifier streamIdentifier;
|
||||||
|
|
||||||
private FanOutConfig config;
|
private FanOutConfig config;
|
||||||
|
|
||||||
|
|
@ -69,6 +72,8 @@ public class FanOutConfigTest {
|
||||||
.streamName(TEST_STREAM_NAME);
|
.streamName(TEST_STREAM_NAME);
|
||||||
doReturn(consumerRegistration).when(config)
|
doReturn(consumerRegistration).when(config)
|
||||||
.createConsumerRegistration(eq(kinesisClient), anyString(), anyString());
|
.createConsumerRegistration(eq(kinesisClient), anyString(), anyString());
|
||||||
|
when(streamConfig.streamIdentifier()).thenReturn(streamIdentifier);
|
||||||
|
when(streamIdentifier.streamName()).thenReturn(TEST_STREAM_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue