Merge pull request #14 from Renjuju/cherry-pick-interface-changes

[Cherry-pick to ltr_1] Add DataFetcher and ShardDetector interface changes (#9)
This commit is contained in:
ashwing 2020-04-22 00:41:20 -07:00 committed by GitHub
commit a5f251014e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 545 additions and 186 deletions

View file

@ -15,6 +15,10 @@
package software.amazon.kinesis.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -29,14 +33,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
@ -50,6 +52,7 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -59,9 +62,7 @@ import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
@ -77,7 +78,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
@ -89,9 +89,6 @@ import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
/**
*
*/
@ -326,6 +323,7 @@ public class Scheduler implements Runnable {
if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
leaderElectedPeriodicShardSyncManager.syncShardsOnce();
}
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
@ -551,7 +549,6 @@ public class Scheduler implements Runnable {
* Requests a graceful shutdown of the worker, notifying record processors, that implement
* {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
* checkpoint.
*
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
* previous future.
*
@ -597,8 +594,7 @@ public class Scheduler implements Runnable {
*
* @return a callable that run the graceful shutdown process. This may return a callable that return true if the
* graceful shutdown has already been completed.
* @throws IllegalStateException
* thrown by the callable if another callable has already started the shutdown process.
* @throws IllegalStateException thrown by the callable if another callable has already started the shutdown process.
*/
public Callable<Boolean> createGracefulShutdownCallable() {
if (shutdownComplete()) {
@ -740,8 +736,7 @@ public class Scheduler implements Runnable {
/**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
*
* @param shardInfo
* Kinesis shard info
* @param shardInfo Kinesis shard info
* @return ShardConsumer for the shard
*/
ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo,
@ -806,7 +801,6 @@ public class Scheduler implements Runnable {
/**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* <p>
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
@ -851,7 +845,7 @@ public class Scheduler implements Runnable {
private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
final StreamIdentifier streamIdentifier;
if(streamIdentifierString.isPresent()) {
if (streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get());
} else {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");

View file

@ -26,15 +26,13 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.Synchronized;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;

View file

@ -15,17 +15,15 @@
package software.amazon.kinesis.leases;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
@ -35,11 +33,11 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.MultiStreamTracker;
/**
* Used by the KCL to configure lease management.
@ -145,6 +143,11 @@ public class LeaseManagementConfig {
*/
private int initialLeaseTableWriteCapacity = 10;
/**
* Configurable functional interface to override the existing shardDetector.
*/
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;
/**
* The size of the thread pool to create for the lease renewer to use.
*
@ -352,7 +355,8 @@ public class LeaseManagementConfig {
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
leaseSerializer);
leaseSerializer,
customShardDetectorProvider());
}
return leaseManagementFactory;
}
@ -366,5 +370,4 @@ public class LeaseManagementConfig {
this.leaseManagementFactory = leaseManagementFactory;
return this;
}
}

View file

@ -15,12 +15,11 @@
package software.amazon.kinesis.leases;
import java.util.List;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.StreamIdentifier;
import java.util.List;
/**
*
*/
@ -34,5 +33,4 @@ public interface ShardDetector {
default StreamIdentifier streamIdentifier() {
throw new UnsupportedOperationException("StreamName not available");
}
}

View file

@ -17,7 +17,7 @@ package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@ -61,6 +61,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private StreamConfig streamConfig;
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;
private final long failoverTimeMillis;
private final long epsilonMillis;
private final int maxLeasesForWorker;
@ -365,7 +367,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
@ -382,7 +384,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer);
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null);
this.streamConfig = streamConfig;
}
@ -425,7 +428,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@ -452,6 +456,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
}
@Override
@ -522,7 +527,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
*/
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis,
return customShardDetectorProvider != null ? customShardDetectorProvider.apply(streamConfig) :
new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus, dynamoDbRequestTimeout);
}

View file

@ -16,6 +16,8 @@ package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.function.Function;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -26,11 +28,11 @@ import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@ -38,16 +40,12 @@ import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.List;
import java.util.function.Function;
/**
* Task for invoking the ShardRecordProcessor shutdown() callback.
*/

View file

@ -57,6 +57,7 @@ public class ProcessRecordsInput {
* The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL.
*/
private List<KinesisClientRecord> records;
/**
* A checkpointer that the {@link ShardRecordProcessor} can use to checkpoint its progress.
*/

View file

@ -0,0 +1,48 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.time.Duration;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.metrics.MetricsFactory;
public interface DataFetcherProviderConfig {
/**
* Gets stream identifier for dataFetcher.
*/
StreamIdentifier getStreamIdentifier();
/**
* Gets shard id.
*/
String getShardId();
/**
* Gets current instance of metrics factory.
*/
MetricsFactory getMetricsFactory();
/**
* Gets current max records allowed to process at a given time.
*/
Integer getMaxRecords();
/**
* Gets timeout for kinesis request.
*/
Duration getKinesisRequestTimeout();
}

View file

@ -14,7 +14,9 @@
*/
package software.amazon.kinesis.retrieval;
import java.util.Optional;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.retrieval.polling.DataFetcher;
import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher;
/**
@ -47,9 +49,27 @@ public interface GetRecordsRetrievalStrategy {
boolean isShutdown();
/**
* Returns the KinesisDataFetcher used to records from Kinesis.
* Returns a DataFetcher used to records from Kinesis.
*
* @return KinesisDataFetcher
* @return DataFetcher
*/
KinesisDataFetcher getDataFetcher();
/**
* Returns a DataFetcher override if applicable, else empty for retrieving records from Kinesis.
*
* @return Optional<DataFetcher>
*/
default Optional<DataFetcher> getDataFetcherOverride() {
return Optional.empty();
}
/**
* Returns a dataFetcher by first checking for an override if it exists, else using the default data fetcher.
*
* @return DataFetcher
*/
default DataFetcher dataFetcher() {
return getDataFetcherOverride().orElse(getDataFetcher());
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.time.Duration;
import lombok.Data;
import lombok.NonNull;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.metrics.MetricsFactory;
/**
* Configuration needed for custom data fetchers
*/
@Data
public class KinesisDataFetcherProviderConfig implements DataFetcherProviderConfig {
@NonNull
private StreamIdentifier streamIdentifier;
@NonNull
private String shardId;
@NonNull
private MetricsFactory metricsFactory;
@NonNull
private Integer maxRecords;
@NonNull
private Duration kinesisRequestTimeout;
}

View file

@ -29,11 +29,15 @@ import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
/**
* Used by the KCL to configure the retrieval of records from Kinesis.
*/
@Getter @Setter @ToString @EqualsAndHashCode
@Getter
@Setter
@ToString
@EqualsAndHashCode
@Accessors(fluent = true)
public class RetrievalConfig {
/**
@ -52,6 +56,7 @@ public class RetrievalConfig {
@NonNull
private final String applicationName;
/**
* AppStreamTracker either for multi stream tracking or single stream
*/
@ -117,17 +122,29 @@ public class RetrievalConfig {
}
public RetrievalFactory retrievalFactory() {
if (retrievalFactory == null) {
if (retrievalSpecificConfig == null) {
retrievalSpecificConfig = new FanOutConfig(kinesisClient())
.applicationName(applicationName());
retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig,
streamConfig -> ((FanOutConfig)retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
streamConfig -> ((FanOutConfig) retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
}
retrievalFactory = retrievalSpecificConfig.retrievalFactory();
}
validateConfig();
return retrievalFactory;
}
private void validateConfig() {
boolean isPollingConfig = retrievalSpecificConfig instanceof PollingConfig;
boolean isInvalidPollingConfig = isPollingConfig && appStreamTracker.map(multiStreamTracker ->
((PollingConfig) retrievalSpecificConfig).streamName() != null,
streamConfig ->
streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null);
if(isInvalidPollingConfig) {
throw new IllegalArgumentException("Invalid config: multistream enabled with streamName or single stream with no streamName");
}
}
}

View file

@ -15,6 +15,9 @@
package software.amazon.kinesis.retrieval;
import java.util.function.Function;
import software.amazon.kinesis.retrieval.polling.DataFetcher;
public interface RetrievalSpecificConfig {
/**
* Creates and returns a retrieval factory for the specific configuration

View file

@ -0,0 +1,124 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval.polling;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public interface DataFetcher {
/**
* Get records from the current position in the stream (up to maxRecords).
*
* @return list of records of up to maxRecords size
*/
DataFetcherResult getRecords();
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
*
* @param initialCheckpoint Current checkpoint sequence number for this shard.
* @param initialPositionInStream The initialPositionInStream.
*/
void initialize(String initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream);
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number as an
* ExtendedSequenceNumber.
*
* @param initialCheckpoint Current checkpoint sequence number for this shard.
* @param initialPositionInStream The initialPositionInStream.
*/
void initialize(ExtendedSequenceNumber initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream);
/**
* Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number.
*
* @param sequenceNumber advance the iterator to the record at this sequence number.
* @param initialPositionInStream The initialPositionInStream.
*/
void advanceIteratorTo(String sequenceNumber,
InitialPositionInStreamExtended initialPositionInStream);
/**
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
* records call.
*/
void restartIterator();
/**
* Resets the iterator by setting shardIterator, sequenceNumber and the position in the stream.
*
* @param shardIterator set the current shard iterator.
* @param sequenceNumber reset the iterator to the record at this sequence number.
* @param initialPositionInStream the current position in the stream to reset the iterator to.
*/
void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream);
/**
* Retrieves the response based on the request.
*
* @param request the current get records request used to receive a response.
* @return GetRecordsResponse response for getRecords
*/
GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws Exception;
/**
* Retrieves the next get records request based on the current iterator.
*
* @param nextIterator specify the iterator to get the next record request
* @return {@link GetRecordsRequest}
*/
GetRecordsRequest getGetRecordsRequest(String nextIterator);
/**
* Gets the next iterator based on the request.
*
* @param request used to obtain the next shard iterator
* @return next iterator string
*/
String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException;
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponse}
*/
GetRecordsResponse getRecords(@NonNull String nextIterator);
/**
* Get the current account and stream information.
*
* @return {@link StreamIdentifier}
*/
StreamIdentifier getStreamIdentifier();
/**
* Checks if shardEnd is reached.
* @return boolean to determine whether shard end is reached
*/
boolean isShardEndReached();
}

View file

@ -14,20 +14,18 @@
*/
package software.amazon.kinesis.retrieval.polling;
import com.google.common.collect.Iterables;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Iterables;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
@ -47,8 +45,10 @@ import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -57,7 +57,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
*/
@Slf4j
@KinesisClientInternalApi
public class KinesisDataFetcher {
public class KinesisDataFetcher implements DataFetcher {
private static final String METRICS_PREFIX = "KinesisDataFetcher";
private static final String OPERATION = "ProcessTask";
@ -76,33 +76,39 @@ public class KinesisDataFetcher {
@Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
this(kinesisClient, new KinesisDataFetcherProviderConfig(
StreamIdentifier.singleStreamInstance(streamName),
shardId,
metricsFactory,
maxRecords,
PollingConfig.DEFAULT_REQUEST_TIMEOUT
));
}
/**
* Constructs KinesisDataFetcher.
* @param kinesisClient
* @param streamIdentifier
* @param shardId
* @param maxRecords
* @param metricsFactory
* @param maxFutureWait
*/
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) {
this.kinesisClient = kinesisClient;
this.streamIdentifier = streamIdentifier;
this.shardId = shardId;
this.maxRecords = maxRecords;
this.metricsFactory = metricsFactory;
this.maxFutureWait = maxFutureWait;
this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId;
}
/** Note: This method has package level access for testing purposes.
* Note: This method has package level access for testing purposes.
*
* @return nextIterator
*/
@Getter(AccessLevel.PACKAGE)
private String nextIterator;
/**
* Constructs KinesisDataFetcher.
*
* @param kinesisClient
* @param kinesisDataFetcherProviderConfig
*/
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, DataFetcherProviderConfig kinesisDataFetcherProviderConfig) {
this.kinesisClient = kinesisClient;
this.maxFutureWait = kinesisDataFetcherProviderConfig.getKinesisRequestTimeout();
this.maxRecords = kinesisDataFetcherProviderConfig.getMaxRecords();
this.metricsFactory = kinesisDataFetcherProviderConfig.getMetricsFactory();
this.shardId = kinesisDataFetcherProviderConfig.getShardId();
this.streamIdentifier = kinesisDataFetcherProviderConfig.getStreamIdentifier();
this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId;
}
@Getter
private boolean isShardEndReached;
private boolean isInitialized;
@ -114,6 +120,7 @@ public class KinesisDataFetcher {
*
* @return list of records of up to maxRecords size
*/
@Override
public DataFetcherResult getRecords() {
if (!isInitialized) {
throw new IllegalArgumentException("KinesisDataFetcher.records called before initialization.");
@ -187,6 +194,7 @@ public class KinesisDataFetcher {
* @param initialCheckpoint Current checkpoint sequence number for this shard.
* @param initialPositionInStream The initialPositionInStream.
*/
@Override
public void initialize(final String initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint);
@ -194,6 +202,7 @@ public class KinesisDataFetcher {
isInitialized = true;
}
@Override
public void initialize(final ExtendedSequenceNumber initialCheckpoint,
final InitialPositionInStreamExtended initialPositionInStream) {
log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber());
@ -207,6 +216,7 @@ public class KinesisDataFetcher {
* @param sequenceNumber advance the iterator to the record at this sequence number.
* @param initialPositionInStream The initialPositionInStream.
*/
@Override
public void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream) {
if (sequenceNumber == null) {
@ -228,9 +238,7 @@ public class KinesisDataFetcher {
try {
try {
final GetShardIteratorResponse result = FutureUtils
.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait);
nextIterator = result.shardIterator();
nextIterator = getNextIterator(request);
success = true;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
@ -260,6 +268,7 @@ public class KinesisDataFetcher {
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
* records call.
*/
@Override
public void restartIterator() {
if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) {
throw new IllegalStateException(
@ -268,22 +277,15 @@ public class KinesisDataFetcher {
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream);
}
@Override
public void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
this.nextIterator = shardIterator;
this.lastKnownSequenceNumber = sequenceNumber;
this.initialPositionInStream = initialPositionInStream;
}
private GetRecordsResponse getRecords(@NonNull final String nextIterator) {
final AWSExceptionManager exceptionManager = createExceptionManager();
GetRecordsRequest request = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator)
.limit(maxRecords).build();
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
MetricsUtil.addShardId(metricsScope, shardId);
boolean success = false;
long startTime = System.currentTimeMillis();
try {
@Override
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait);
if (!isValidResponse(response)) {
@ -291,6 +293,33 @@ public class KinesisDataFetcher {
+ ". nextShardIterator: " + response.nextShardIterator()
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");
}
return response;
}
@Override
public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator)
.limit(maxRecords).build();
}
@Override
public String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException {
final GetShardIteratorResponse result = FutureUtils
.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait);
return result.shardIterator();
}
@Override
public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
final AWSExceptionManager exceptionManager = createExceptionManager();
GetRecordsRequest request = getGetRecordsRequest(nextIterator);
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
MetricsUtil.addShardId(metricsScope, shardId);
boolean success = false ;
long startTime = System.currentTimeMillis();
try {
final GetRecordsResponse response = getGetRecordsResponse(request);
success = true;
return response;
} catch (ExecutionException e) {

View file

@ -17,31 +17,47 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
@Accessors(fluent = true)
@Data
@Getter
@Setter
@ToString
@EqualsAndHashCode
public class PollingConfig implements RetrievalSpecificConfig {
public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30);
/**
* Configurable functional interface to override the existing DataFetcher.
*/
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
/**
* Name of the Kinesis stream.
*
* @return String
*/
@NonNull
private final String streamName;
private String streamName;
/**
* @param kinesisClient Client used to access Kinesis services.
*/
public PollingConfig(KinesisAsyncClient kinesisClient) {
this.kinesisClient = kinesisClient;
}
/**
* Client used to access to Kinesis service.
@ -60,6 +76,15 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/
private int maxRecords = 10000;
/**
* @param streamName Name of Kinesis stream.
* @param kinesisClient Client used to access Kinesis serivces.
*/
public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
}
/**
* The value for how long the ShardConsumer should sleep if no records are returned from the call to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}.
@ -105,6 +130,6 @@ public class PollingConfig implements RetrievalSpecificConfig {
@Override
public RetrievalFactory retrievalFactory() {
return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory,
maxRecords(), kinesisRequestTimeout);
maxRecords(), kinesisRequestTimeout, dataFetcherProvider);
}
}

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.retrieval.polling;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
@ -25,21 +27,17 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.google.common.annotations.VisibleForTesting;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
@ -61,7 +59,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
/**
@ -108,7 +105,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
@VisibleForTesting @Getter
private final LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue;
private final PrefetchCounters prefetchCounters;
private final KinesisDataFetcher dataFetcher;
private final DataFetcher dataFetcher;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
private String highestSequenceNumber;
@ -215,7 +212,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput),
new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher());
new PrefetchCounters(), this.getRecordsRetrievalStrategy.dataFetcher());
this.executorService = executorService;
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
@ -223,7 +220,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.streamAndShardId =
this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId;
this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier().serialize() + ":" + shardId;
}
@Override

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -22,13 +24,13 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import java.time.Duration;
/**
*
*/
@ -42,32 +44,71 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient;
@NonNull
private final RecordsFetcherFactory recordsFetcherFactory;
// private final long listShardsBackoffTimeInMillis;
// private final int maxListShardsRetryAttempts;
private final int maxRecords;
private final Duration kinesisRequestTimeout;
public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout) {
private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
@Deprecated
public SynchronousBlockingRetrievalFactory(String streamName,
KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords,
Duration kinesisRequestTimeout) {
this(streamName,
kinesisClient,
recordsFetcherFactory,
maxRecords,
kinesisRequestTimeout,
defaultDataFetcherProvider(kinesisClient));
}
public SynchronousBlockingRetrievalFactory(String streamName,
KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords,
Duration kinesisRequestTimeout,
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider) {
this.streamName = streamName;
this.kinesisClient = kinesisClient;
this.recordsFetcherFactory = recordsFetcherFactory;
this.maxRecords = maxRecords;
this.kinesisRequestTimeout = kinesisRequestTimeout;
this.dataFetcherProvider = dataFetcherProvider == null ?
defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
}
@Deprecated
public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords) {
public SynchronousBlockingRetrievalFactory(String streamName,
KinesisAsyncClient kinesisClient,
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords) {
this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
}
private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
KinesisAsyncClient kinesisClient) {
return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig);
}
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(
streamIdentifier,
shardInfo.shardId(),
metricsFactory,
maxRecords,
kinesisRequestTimeout);
final DataFetcher dataFetcher = this.dataFetcherProvider.apply(kinesisDataFetcherProviderConfig);
return new SynchronousGetRecordsRetrievalStrategy(dataFetcher);
}
@Override

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.retrieval.polling;
import java.util.Optional;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@ -26,8 +27,9 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@Data
@KinesisClientInternalApi
public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy {
@NonNull
private final KinesisDataFetcher dataFetcher;
private final DataFetcher dataFetcher;
@Override
public GetRecordsResponse getRecords(final int maxRecords) {
@ -48,6 +50,11 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
@Override
public KinesisDataFetcher getDataFetcher() {
throw new UnsupportedOperationException("Deprecated. Use dataFetcher() to retrieve a dataFetcher");
}
@Override
public DataFetcher dataFetcher() {
return dataFetcher;
}
}

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -25,6 +24,7 @@ import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
@ -71,9 +71,15 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(),
maxRecords, metricsFactory, maxFutureWait));
new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig(
streamIdentifier,
shardInfo.shardId(),
metricsFactory,
maxRecords,
maxFutureWait
)));
}
@Override

View file

@ -107,7 +107,7 @@ public class PrefetchRecordsPublisherTest {
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private KinesisDataFetcher dataFetcher;
private DataFetcher dataFetcher;
@Mock
private InitialPositionInStreamExtended initialPosition;
@Mock
@ -124,7 +124,7 @@ public class PrefetchRecordsPublisherTest {
@Before
public void setup() {
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher);
when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher);
when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream"));
executorService = spy(Executors.newFixedThreadPool(1));
getRecordsCache = new PrefetchRecordsPublisher(

View file

@ -40,14 +40,14 @@ public class RecordsFetcherFactoryTest {
@Mock
private MetricsFactory metricsFactory;
@Mock
private KinesisDataFetcher kinesisDataFetcher;
private DataFetcher dataFetcher;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory();
when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher);
when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher);
when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream"));
}
@Test
@ -66,5 +66,4 @@ public class RecordsFetcherFactoryTest {
metricsFactory, 1);
assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class));
}
}