From a97a617f359914379b2b5595662dd82ac3578a54 Mon Sep 17 00:00:00 2001 From: Renju Radhakrishnan Date: Fri, 17 Apr 2020 13:20:41 -0700 Subject: [PATCH] Add DataFetcher and ShardDetector interface changes (#9) --- .../amazon/kinesis/coordinator/Scheduler.java | 40 +++--- .../kinesis/leases/KinesisShardDetector.java | 4 +- .../kinesis/leases/LeaseManagementConfig.java | 115 ++++++++-------- .../amazon/kinesis/leases/ShardDetector.java | 4 +- .../DynamoDBLeaseManagementFactory.java | 22 ++-- .../kinesis/lifecycle/ShutdownTask.java | 10 +- .../lifecycle/events/ProcessRecordsInput.java | 1 + .../retrieval/DataFetcherProviderConfig.java | 48 +++++++ .../GetRecordsRetrievalStrategy.java | 28 +++- .../KinesisDataFetcherProviderConfig.java | 45 +++++++ .../kinesis/retrieval/RetrievalConfig.java | 27 +++- .../retrieval/RetrievalSpecificConfig.java | 5 +- .../retrieval/polling/DataFetcher.java | 124 ++++++++++++++++++ .../retrieval/polling/KinesisDataFetcher.java | 109 +++++++++------ .../retrieval/polling/PollingConfig.java | 35 ++++- .../polling/PrefetchRecordsPublisher.java | 21 ++- .../SynchronousBlockingRetrievalFactory.java | 59 +++++++-- ...ynchronousGetRecordsRetrievalStrategy.java | 11 +- ...ynchronousPrefetchingRetrievalFactory.java | 12 +- .../polling/PrefetchRecordsPublisherTest.java | 4 +- .../polling/RecordsFetcherFactoryTest.java | 7 +- 21 files changed, 545 insertions(+), 186 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherProviderConfig.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcherProviderConfig.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index cb4b4579..4d6c1fd7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -15,6 +15,10 @@ package software.amazon.kinesis.coordinator; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; + +import io.reactivex.plugins.RxJavaPlugins; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -29,14 +33,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -50,6 +52,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -59,9 +62,7 @@ import software.amazon.kinesis.leases.MultiStreamLease; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; -import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; @@ -77,7 +78,6 @@ import software.amazon.kinesis.lifecycle.ShutdownNotification; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; -import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -89,9 +89,6 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; - /** * */ @@ -326,6 +323,7 @@ public class Scheduler implements Runnable { if (shouldInitiateLeaseSync()) { log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier()); leaderElectedPeriodicShardSyncManager.syncShardsOnce(); + } } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); @@ -551,7 +549,6 @@ public class Scheduler implements Runnable { * Requests a graceful shutdown of the worker, notifying record processors, that implement * {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * checkpoint. - * * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * previous future. * @@ -578,8 +575,8 @@ public class Scheduler implements Runnable { * * * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown - * completed successfully. A false value indicates that a non-exception case caused the shutdown process to - * terminate early. + * completed successfully. A false value indicates that a non-exception case caused the shutdown process to + * terminate early. */ public Future startGracefulShutdown() { synchronized (this) { @@ -596,9 +593,8 @@ public class Scheduler implements Runnable { * shutdowns in your own executor, or execute the shutdown synchronously. * * @return a callable that run the graceful shutdown process. This may return a callable that return true if the - * graceful shutdown has already been completed. - * @throws IllegalStateException - * thrown by the callable if another callable has already started the shutdown process. + * graceful shutdown has already been completed. + * @throws IllegalStateException thrown by the callable if another callable has already started the shutdown process. */ public Callable createGracefulShutdownCallable() { if (shutdownComplete()) { @@ -740,12 +736,11 @@ public class Scheduler implements Runnable { /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * - * @param shardInfo - * Kinesis shard info + * @param shardInfo Kinesis shard info * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo, - @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { + @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -766,10 +761,10 @@ public class Scheduler implements Runnable { } protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, - @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { + @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory); ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, - checkpoint); + checkpoint); // The only case where streamName is not available will be when multistreamtracker not set. In this case, // get the default stream name for the single stream application. final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); @@ -806,7 +801,6 @@ public class Scheduler implements Runnable { /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. - *

* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo @@ -851,7 +845,7 @@ public class Scheduler implements Runnable { private StreamIdentifier getStreamIdentifier(Optional streamIdentifierString) { final StreamIdentifier streamIdentifier; - if(streamIdentifierString.isPresent()) { + if (streamIdentifierString.isPresent()) { streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get()); } else { Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index c0c3bdee..a29c5ce4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -26,15 +26,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; import lombok.Synchronized; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.LimitExceededException; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 2a5a0b1e..acaa8de0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -15,17 +15,15 @@ package software.amazon.kinesis.leases; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.time.Duration; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - +import java.util.function.Function; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; @@ -35,11 +33,11 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; -import software.amazon.kinesis.processor.MultiStreamTracker; /** * Used by the KCL to configure lease management. @@ -145,6 +143,11 @@ public class LeaseManagementConfig { */ private int initialLeaseTableWriteCapacity = 10; + /** + * Configurable functional interface to override the existing shardDetector. + */ + private Function customShardDetectorProvider; + /** * The size of the thread pool to create for the lease renewer to use. * @@ -291,30 +294,30 @@ public class LeaseManagementConfig { if (leaseManagementFactory == null) { Validate.notEmpty(streamName(), "Stream name is empty"); leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), - streamName(), - dynamoDBClient(), - tableName(), - workerIdentifier(), - executorService(), - initialPositionInStream(), - failoverTimeMillis(), - epsilonMillis(), - maxLeasesForWorker(), - maxLeasesToStealAtOneTime(), - maxLeaseRenewalThreads(), - cleanupLeasesUponShardCompletion(), - ignoreUnexpectedChildShards(), - shardSyncIntervalMillis(), - consistentReads(), - listShardsBackoffTimeInMillis(), - maxListShardsRetryAttempts(), - maxCacheMissesBeforeReload(), - listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus(), - initialLeaseTableReadCapacity(), - initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(), - tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); + streamName(), + dynamoDBClient(), + tableName(), + workerIdentifier(), + executorService(), + initialPositionInStream(), + failoverTimeMillis(), + epsilonMillis(), + maxLeasesForWorker(), + maxLeasesToStealAtOneTime(), + maxLeaseRenewalThreads(), + cleanupLeasesUponShardCompletion(), + ignoreUnexpectedChildShards(), + shardSyncIntervalMillis(), + consistentReads(), + listShardsBackoffTimeInMillis(), + maxListShardsRetryAttempts(), + maxCacheMissesBeforeReload(), + listShardsCacheAllowedAgeInSeconds(), + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity(), + hierarchicalShardSyncer(), + tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); } return leaseManagementFactory; } @@ -328,31 +331,32 @@ public class LeaseManagementConfig { public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) { if(leaseManagementFactory == null) { leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), - dynamoDBClient(), - tableName(), - workerIdentifier(), - executorService(), - failoverTimeMillis(), - epsilonMillis(), - maxLeasesForWorker(), - maxLeasesToStealAtOneTime(), - maxLeaseRenewalThreads(), - cleanupLeasesUponShardCompletion(), - ignoreUnexpectedChildShards(), - shardSyncIntervalMillis(), - consistentReads(), - listShardsBackoffTimeInMillis(), - maxListShardsRetryAttempts(), - maxCacheMissesBeforeReload(), - listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus(), - initialLeaseTableReadCapacity(), - initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(isMultiStreamingMode), - tableCreatorCallback(), - dynamoDbRequestTimeout(), - billingMode(), - leaseSerializer); + dynamoDBClient(), + tableName(), + workerIdentifier(), + executorService(), + failoverTimeMillis(), + epsilonMillis(), + maxLeasesForWorker(), + maxLeasesToStealAtOneTime(), + maxLeaseRenewalThreads(), + cleanupLeasesUponShardCompletion(), + ignoreUnexpectedChildShards(), + shardSyncIntervalMillis(), + consistentReads(), + listShardsBackoffTimeInMillis(), + maxListShardsRetryAttempts(), + maxCacheMissesBeforeReload(), + listShardsCacheAllowedAgeInSeconds(), + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity(), + hierarchicalShardSyncer(isMultiStreamingMode), + tableCreatorCallback(), + dynamoDbRequestTimeout(), + billingMode(), + leaseSerializer, + customShardDetectorProvider()); } return leaseManagementFactory; } @@ -366,5 +370,4 @@ public class LeaseManagementConfig { this.leaseManagementFactory = leaseManagementFactory; return this; } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 1b2822ee..2967a9fb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -15,12 +15,11 @@ package software.amazon.kinesis.leases; +import java.util.List; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.common.StreamIdentifier; -import java.util.List; - /** * */ @@ -34,5 +33,4 @@ public interface ShardDetector { default StreamIdentifier streamIdentifier() { throw new UnsupportedOperationException("StreamName not available"); } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 8c09af52..44879c1c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -17,7 +17,7 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; import java.util.concurrent.ExecutorService; - +import java.util.function.Function; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -61,6 +61,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @NonNull private StreamConfig streamConfig; + private Function customShardDetectorProvider; + private final long failoverTimeMillis; private final long epsilonMillis; private final int maxLeasesForWorker; @@ -231,7 +233,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { /** * Constructor. - * + * * @param kinesisClient * @param streamName * @param dynamoDBClient @@ -365,7 +367,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param dynamoDbRequestTimeout * @param billingMode */ - public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, + private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, @@ -382,7 +384,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer); + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer, + null); this.streamConfig = streamConfig; } @@ -425,7 +428,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, - Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { + Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer, + Function customShardDetectorProvider) { this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; @@ -452,6 +456,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; this.leaseSerializer = leaseSerializer; + this.customShardDetectorProvider = customShardDetectorProvider; } @Override @@ -522,8 +527,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { */ @Override public ShardDetector createShardDetector(StreamConfig streamConfig) { - return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis, - maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, - cacheMissWarningModulus, dynamoDbRequestTimeout); + return customShardDetectorProvider != null ? customShardDetectorProvider.apply(streamConfig) : + new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, + cacheMissWarningModulus, dynamoDbRequestTimeout); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 33eb4497..aab984eb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -16,6 +16,8 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.function.Function; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -26,11 +28,11 @@ import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -38,16 +40,12 @@ import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; - -import java.util.List; -import java.util.function.Function; - /** * Task for invoking the ShardRecordProcessor shutdown() callback. */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java index 3bfcd514..1ce9239b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/events/ProcessRecordsInput.java @@ -57,6 +57,7 @@ public class ProcessRecordsInput { * The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL. */ private List records; + /** * A checkpointer that the {@link ShardRecordProcessor} can use to checkpoint its progress. */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherProviderConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherProviderConfig.java new file mode 100644 index 00000000..b5c7b23e --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherProviderConfig.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import java.time.Duration; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.metrics.MetricsFactory; + +public interface DataFetcherProviderConfig { + + /** + * Gets stream identifier for dataFetcher. + */ + StreamIdentifier getStreamIdentifier(); + + /** + * Gets shard id. + */ + String getShardId(); + + /** + * Gets current instance of metrics factory. + */ + MetricsFactory getMetricsFactory(); + + /** + * Gets current max records allowed to process at a given time. + */ + Integer getMaxRecords(); + + /** + * Gets timeout for kinesis request. + */ + Duration getKinesisRequestTimeout(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java index ca0487f3..3ff8e620 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java @@ -14,7 +14,9 @@ */ package software.amazon.kinesis.retrieval; +import java.util.Optional; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.retrieval.polling.DataFetcher; import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher; /** @@ -41,15 +43,33 @@ public interface GetRecordsRetrievalStrategy { /** * Returns whether this strategy has been shutdown. - * + * * @return true if the strategy has been shutdown, false otherwise. */ boolean isShutdown(); /** - * Returns the KinesisDataFetcher used to records from Kinesis. - * - * @return KinesisDataFetcher + * Returns a DataFetcher used to records from Kinesis. + * + * @return DataFetcher */ KinesisDataFetcher getDataFetcher(); + + /** + * Returns a DataFetcher override if applicable, else empty for retrieving records from Kinesis. + * + * @return Optional + */ + default Optional getDataFetcherOverride() { + return Optional.empty(); + } + + /** + * Returns a dataFetcher by first checking for an override if it exists, else using the default data fetcher. + * + * @return DataFetcher + */ + default DataFetcher dataFetcher() { + return getDataFetcherOverride().orElse(getDataFetcher()); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcherProviderConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcherProviderConfig.java new file mode 100644 index 00000000..7cf6cdcf --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcherProviderConfig.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import java.time.Duration; +import lombok.Data; +import lombok.NonNull; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.metrics.MetricsFactory; + + +/** + * Configuration needed for custom data fetchers + */ +@Data +public class KinesisDataFetcherProviderConfig implements DataFetcherProviderConfig { + + @NonNull + private StreamIdentifier streamIdentifier; + + @NonNull + private String shardId; + + @NonNull + private MetricsFactory metricsFactory; + + @NonNull + private Integer maxRecords; + + @NonNull + private Duration kinesisRequestTimeout; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 746fdc19..5f22411a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -29,11 +29,15 @@ import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.processor.MultiStreamTracker; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; +import software.amazon.kinesis.retrieval.polling.PollingConfig; /** * Used by the KCL to configure the retrieval of records from Kinesis. */ -@Getter @Setter @ToString @EqualsAndHashCode +@Getter +@Setter +@ToString +@EqualsAndHashCode @Accessors(fluent = true) public class RetrievalConfig { /** @@ -52,6 +56,7 @@ public class RetrievalConfig { @NonNull private final String applicationName; + /** * AppStreamTracker either for multi stream tracking or single stream */ @@ -91,7 +96,7 @@ public class RetrievalConfig { private RetrievalFactory retrievalFactory; public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull String streamName, - @NonNull String applicationName) { + @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; this.appStreamTracker = Either .right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended)); @@ -99,7 +104,7 @@ public class RetrievalConfig { } public RetrievalConfig(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull MultiStreamTracker multiStreamTracker, - @NonNull String applicationName) { + @NonNull String applicationName) { this.kinesisClient = kinesisAsyncClient; this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; @@ -117,17 +122,29 @@ public class RetrievalConfig { } public RetrievalFactory retrievalFactory() { - if (retrievalFactory == null) { if (retrievalSpecificConfig == null) { retrievalSpecificConfig = new FanOutConfig(kinesisClient()) .applicationName(applicationName()); retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig, - streamConfig -> ((FanOutConfig)retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName())); + streamConfig -> ((FanOutConfig) retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName())); } + retrievalFactory = retrievalSpecificConfig.retrievalFactory(); } + validateConfig(); return retrievalFactory; } + private void validateConfig() { + boolean isPollingConfig = retrievalSpecificConfig instanceof PollingConfig; + boolean isInvalidPollingConfig = isPollingConfig && appStreamTracker.map(multiStreamTracker -> + ((PollingConfig) retrievalSpecificConfig).streamName() != null, + streamConfig -> + streamConfig.streamIdentifier() == null || streamConfig.streamIdentifier().streamName() == null); + + if(isInvalidPollingConfig) { + throw new IllegalArgumentException("Invalid config: multistream enabled with streamName or single stream with no streamName"); + } + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java index 5ab982bf..30562994 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalSpecificConfig.java @@ -15,10 +15,13 @@ package software.amazon.kinesis.retrieval; +import java.util.function.Function; +import software.amazon.kinesis.retrieval.polling.DataFetcher; + public interface RetrievalSpecificConfig { /** * Creates and returns a retrieval factory for the specific configuration - * + * * @return a retrieval factory that can create an appropriate retriever */ RetrievalFactory retrievalFactory(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java new file mode 100644 index 00000000..ae1c6f30 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java @@ -0,0 +1,124 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval.polling; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import lombok.NonNull; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +public interface DataFetcher { + /** + * Get records from the current position in the stream (up to maxRecords). + * + * @return list of records of up to maxRecords size + */ + DataFetcherResult getRecords(); + + /** + * Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number. + * + * @param initialCheckpoint Current checkpoint sequence number for this shard. + * @param initialPositionInStream The initialPositionInStream. + */ + void initialize(String initialCheckpoint, + InitialPositionInStreamExtended initialPositionInStream); + + /** + * Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number as an + * ExtendedSequenceNumber. + * + * @param initialCheckpoint Current checkpoint sequence number for this shard. + * @param initialPositionInStream The initialPositionInStream. + */ + void initialize(ExtendedSequenceNumber initialCheckpoint, + InitialPositionInStreamExtended initialPositionInStream); + + /** + * Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number. + * + * @param sequenceNumber advance the iterator to the record at this sequence number. + * @param initialPositionInStream The initialPositionInStream. + */ + void advanceIteratorTo(String sequenceNumber, + InitialPositionInStreamExtended initialPositionInStream); + + /** + * Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last + * records call. + */ + void restartIterator(); + + /** + * Resets the iterator by setting shardIterator, sequenceNumber and the position in the stream. + * + * @param shardIterator set the current shard iterator. + * @param sequenceNumber reset the iterator to the record at this sequence number. + * @param initialPositionInStream the current position in the stream to reset the iterator to. + */ + void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream); + + /** + * Retrieves the response based on the request. + * + * @param request the current get records request used to receive a response. + * @return GetRecordsResponse response for getRecords + */ + GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws Exception; + + /** + * Retrieves the next get records request based on the current iterator. + * + * @param nextIterator specify the iterator to get the next record request + * @return {@link GetRecordsRequest} + */ + GetRecordsRequest getGetRecordsRequest(String nextIterator); + + /** + * Gets the next iterator based on the request. + * + * @param request used to obtain the next shard iterator + * @return next iterator string + */ + String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException; + + /** + * Gets the next set of records based on the iterator. + * + * @param nextIterator specified shard iterator for getting the next set of records + * @return {@link GetRecordsResponse} + */ + GetRecordsResponse getRecords(@NonNull String nextIterator); + + /** + * Get the current account and stream information. + * + * @return {@link StreamIdentifier} + */ + StreamIdentifier getStreamIdentifier(); + + /** + * Checks if shardEnd is reached. + * @return boolean to determine whether shard end is reached + */ + boolean isShardEndReached(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index a96e2134..82b31915 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -14,20 +14,18 @@ */ package software.amazon.kinesis.retrieval.polling; +import com.google.common.collect.Iterables; + import java.time.Duration; import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; - -import org.apache.commons.lang3.StringUtils; - -import com.google.common.collect.Iterables; - import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; @@ -47,8 +45,10 @@ import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.AWSExceptionManager; +import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.IteratorBuilder; +import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -57,7 +57,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; */ @Slf4j @KinesisClientInternalApi -public class KinesisDataFetcher { +public class KinesisDataFetcher implements DataFetcher { private static final String METRICS_PREFIX = "KinesisDataFetcher"; private static final String OPERATION = "ProcessTask"; @@ -76,33 +76,39 @@ public class KinesisDataFetcher { @Deprecated public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { - this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); + this(kinesisClient, new KinesisDataFetcherProviderConfig( + StreamIdentifier.singleStreamInstance(streamName), + shardId, + metricsFactory, + maxRecords, + PollingConfig.DEFAULT_REQUEST_TIMEOUT + )); } /** - * Constructs KinesisDataFetcher. - * @param kinesisClient - * @param streamIdentifier - * @param shardId - * @param maxRecords - * @param metricsFactory - * @param maxFutureWait - */ - public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { - this.kinesisClient = kinesisClient; - this.streamIdentifier = streamIdentifier; - this.shardId = shardId; - this.maxRecords = maxRecords; - this.metricsFactory = metricsFactory; - this.maxFutureWait = maxFutureWait; - this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId; - } - - /** Note: This method has package level access for testing purposes. + * Note: This method has package level access for testing purposes. + * * @return nextIterator */ @Getter(AccessLevel.PACKAGE) private String nextIterator; + + /** + * Constructs KinesisDataFetcher. + * + * @param kinesisClient + * @param kinesisDataFetcherProviderConfig + */ + public KinesisDataFetcher(KinesisAsyncClient kinesisClient, DataFetcherProviderConfig kinesisDataFetcherProviderConfig) { + this.kinesisClient = kinesisClient; + this.maxFutureWait = kinesisDataFetcherProviderConfig.getKinesisRequestTimeout(); + this.maxRecords = kinesisDataFetcherProviderConfig.getMaxRecords(); + this.metricsFactory = kinesisDataFetcherProviderConfig.getMetricsFactory(); + this.shardId = kinesisDataFetcherProviderConfig.getShardId(); + this.streamIdentifier = kinesisDataFetcherProviderConfig.getStreamIdentifier(); + this.streamAndShardId = streamIdentifier.serialize() + ":" + shardId; + } + @Getter private boolean isShardEndReached; private boolean isInitialized; @@ -114,6 +120,7 @@ public class KinesisDataFetcher { * * @return list of records of up to maxRecords size */ + @Override public DataFetcherResult getRecords() { if (!isInitialized) { throw new IllegalArgumentException("KinesisDataFetcher.records called before initialization."); @@ -187,6 +194,7 @@ public class KinesisDataFetcher { * @param initialCheckpoint Current checkpoint sequence number for this shard. * @param initialPositionInStream The initialPositionInStream. */ + @Override public void initialize(final String initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint); @@ -194,6 +202,7 @@ public class KinesisDataFetcher { isInitialized = true; } + @Override public void initialize(final ExtendedSequenceNumber initialCheckpoint, final InitialPositionInStreamExtended initialPositionInStream) { log.info("Initializing shard {} with {}", streamAndShardId, initialCheckpoint.sequenceNumber()); @@ -207,6 +216,7 @@ public class KinesisDataFetcher { * @param sequenceNumber advance the iterator to the record at this sequence number. * @param initialPositionInStream The initialPositionInStream. */ + @Override public void advanceIteratorTo(final String sequenceNumber, final InitialPositionInStreamExtended initialPositionInStream) { if (sequenceNumber == null) { @@ -228,9 +238,7 @@ public class KinesisDataFetcher { try { try { - final GetShardIteratorResponse result = FutureUtils - .resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait); - nextIterator = result.shardIterator(); + nextIterator = getNextIterator(request); success = true; } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); @@ -260,6 +268,7 @@ public class KinesisDataFetcher { * Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last * records call. */ + @Override public void restartIterator() { if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) { throw new IllegalStateException( @@ -268,29 +277,49 @@ public class KinesisDataFetcher { advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream); } + @Override public void resetIterator(String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { this.nextIterator = shardIterator; this.lastKnownSequenceNumber = sequenceNumber; this.initialPositionInStream = initialPositionInStream; } - private GetRecordsResponse getRecords(@NonNull final String nextIterator) { - final AWSExceptionManager exceptionManager = createExceptionManager(); - GetRecordsRequest request = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) + @Override + public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { + final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), + maxFutureWait); + if (!isValidResponse(response)) { + throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + + ". nextShardIterator: " + response.nextShardIterator() + + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); + } + return response; + } + + @Override + public GetRecordsRequest getGetRecordsRequest(String nextIterator) { + return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) .limit(maxRecords).build(); + } + + @Override + public String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException { + final GetShardIteratorResponse result = FutureUtils + .resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait); + return result.shardIterator(); + } + + @Override + public GetRecordsResponse getRecords(@NonNull final String nextIterator) { + final AWSExceptionManager exceptionManager = createExceptionManager(); + GetRecordsRequest request = getGetRecordsRequest(nextIterator); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); MetricsUtil.addShardId(metricsScope, shardId); - boolean success = false; + boolean success = false ; long startTime = System.currentTimeMillis(); try { - final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), - maxFutureWait); - if (!isValidResponse(response)) { - throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId - + ". nextShardIterator: " + response.nextShardIterator() - + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); - } + final GetRecordsResponse response = getGetRecordsResponse(request); success = true; return response; } catch (ExecutionException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index cc574506..d8c2405a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -17,31 +17,47 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.util.Optional; - +import java.util.function.Function; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; import lombok.experimental.Accessors; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RetrievalFactory; import software.amazon.kinesis.retrieval.RetrievalSpecificConfig; @Accessors(fluent = true) -@Data @Getter +@Setter +@ToString +@EqualsAndHashCode public class PollingConfig implements RetrievalSpecificConfig { public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30); + /** + * Configurable functional interface to override the existing DataFetcher. + */ + Function dataFetcherProvider; /** * Name of the Kinesis stream. * * @return String */ - @NonNull - private final String streamName; + private String streamName; + + /** + * @param kinesisClient Client used to access Kinesis services. + */ + public PollingConfig(KinesisAsyncClient kinesisClient) { + this.kinesisClient = kinesisClient; + } /** * Client used to access to Kinesis service. @@ -60,6 +76,15 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private int maxRecords = 10000; + /** + * @param streamName Name of Kinesis stream. + * @param kinesisClient Client used to access Kinesis serivces. + */ + public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + } + /** * The value for how long the ShardConsumer should sleep if no records are returned from the call to * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. @@ -105,6 +130,6 @@ public class PollingConfig implements RetrievalSpecificConfig { @Override public RetrievalFactory retrievalFactory() { return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, - maxRecords(), kinesisRequestTimeout); + maxRecords(), kinesisRequestTimeout, dataFetcherProvider); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 6e172f08..d9e00669 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.retrieval.polling; +import com.google.common.annotations.VisibleForTesting; + import java.time.Duration; import java.time.Instant; import java.util.List; @@ -25,21 +27,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; - import lombok.AccessLevel; +import lombok.Data; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; - -import com.google.common.annotations.VisibleForTesting; - -import lombok.Data; -import lombok.NonNull; -import lombok.experimental.Accessors; -import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; @@ -61,7 +59,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; - import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; /** @@ -108,7 +105,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @VisibleForTesting @Getter private final LinkedBlockingQueue prefetchRecordsQueue; private final PrefetchCounters prefetchCounters; - private final KinesisDataFetcher dataFetcher; + private final DataFetcher dataFetcher; private InitialPositionInStreamExtended initialPositionInStreamExtended; private String highestSequenceNumber; @@ -215,7 +212,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput), - new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher()); + new PrefetchCounters(), this.getRecordsRetrievalStrategy.dataFetcher()); this.executorService = executorService; this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); this.idleMillisBetweenCalls = idleMillisBetweenCalls; @@ -223,7 +220,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; this.streamAndShardId = - this.getRecordsRetrievalStrategy.getDataFetcher().getStreamIdentifier().serialize() + ":" + shardId; + this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier().serialize() + ":" + shardId; } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 73273c34..071763fc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.retrieval.polling; +import java.time.Duration; +import java.util.function.Function; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -22,13 +24,13 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; -import java.time.Duration; - /** * */ @@ -42,32 +44,71 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { private final KinesisAsyncClient kinesisClient; @NonNull private final RecordsFetcherFactory recordsFetcherFactory; - // private final long listShardsBackoffTimeInMillis; - // private final int maxListShardsRetryAttempts; + private final int maxRecords; private final Duration kinesisRequestTimeout; - public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout) { + private final Function dataFetcherProvider; + + @Deprecated + public SynchronousBlockingRetrievalFactory(String streamName, + KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, + int maxRecords, + Duration kinesisRequestTimeout) { + this(streamName, + kinesisClient, + recordsFetcherFactory, + maxRecords, + kinesisRequestTimeout, + defaultDataFetcherProvider(kinesisClient)); + } + + public SynchronousBlockingRetrievalFactory(String streamName, + KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, + int maxRecords, + Duration kinesisRequestTimeout, + Function dataFetcherProvider) { this.streamName = streamName; this.kinesisClient = kinesisClient; this.recordsFetcherFactory = recordsFetcherFactory; this.maxRecords = maxRecords; this.kinesisRequestTimeout = kinesisRequestTimeout; + this.dataFetcherProvider = dataFetcherProvider == null ? + defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider; } @Deprecated - public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, int maxRecords) { + public SynchronousBlockingRetrievalFactory(String streamName, + KinesisAsyncClient kinesisClient, + RecordsFetcherFactory recordsFetcherFactory, + int maxRecords) { this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT); } + private static Function defaultDataFetcherProvider( + KinesisAsyncClient kinesisClient) { + return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig); + } + @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { + @NonNull final MetricsFactory metricsFactory) { final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : StreamIdentifier.singleStreamInstance(streamName); - return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); + + final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig( + streamIdentifier, + shardInfo.shardId(), + metricsFactory, + maxRecords, + kinesisRequestTimeout); + + final DataFetcher dataFetcher = this.dataFetcherProvider.apply(kinesisDataFetcherProviderConfig); + + return new SynchronousGetRecordsRetrievalStrategy(dataFetcher); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java index c6fa619b..7f3b54d5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.retrieval.polling; +import java.util.Optional; import lombok.Data; import lombok.NonNull; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -26,8 +27,9 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @Data @KinesisClientInternalApi public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { + @NonNull - private final KinesisDataFetcher dataFetcher; + private final DataFetcher dataFetcher; @Override public GetRecordsResponse getRecords(final int maxRecords) { @@ -45,9 +47,14 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev public boolean isShutdown() { return false; } - + @Override public KinesisDataFetcher getDataFetcher() { + throw new UnsupportedOperationException("Deprecated. Use dataFetcher() to retrieve a dataFetcher"); + } + + @Override + public DataFetcher dataFetcher() { return dataFetcher; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index 4a8c5250..efa11e70 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -17,7 +17,6 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.util.concurrent.ExecutorService; - import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -25,6 +24,7 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; @@ -71,9 +71,15 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : StreamIdentifier.singleStreamInstance(streamName); + return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), - maxRecords, metricsFactory, maxFutureWait)); + new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig( + streamIdentifier, + shardInfo.shardId(), + metricsFactory, + maxRecords, + maxFutureWait + ))); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f7051ec4..281d738c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -107,7 +107,7 @@ public class PrefetchRecordsPublisherTest { @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @Mock - private KinesisDataFetcher dataFetcher; + private DataFetcher dataFetcher; @Mock private InitialPositionInStreamExtended initialPosition; @Mock @@ -124,7 +124,7 @@ public class PrefetchRecordsPublisherTest { @Before public void setup() { - when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher); + when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher); when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("testStream")); executorService = spy(Executors.newFixedThreadPool(1)); getRecordsCache = new PrefetchRecordsPublisher( diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java index d6d8b6d5..ddc25e21 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java @@ -40,14 +40,14 @@ public class RecordsFetcherFactoryTest { @Mock private MetricsFactory metricsFactory; @Mock - private KinesisDataFetcher kinesisDataFetcher; + private DataFetcher dataFetcher; @Before public void setUp() { MockitoAnnotations.initMocks(this); recordsFetcherFactory = new SimpleRecordsFetcherFactory(); - when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(kinesisDataFetcher); - when(kinesisDataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); + when(getRecordsRetrievalStrategy.dataFetcher()).thenReturn(dataFetcher); + when(dataFetcher.getStreamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance("stream")); } @Test @@ -66,5 +66,4 @@ public class RecordsFetcherFactoryTest { metricsFactory, 1); assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class)); } - }