From a6f4aa9651c86717b5e8d1ea44207032792e1953 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 18 Apr 2018 11:08:21 -0700 Subject: [PATCH] Get consumer running again Temporary fix for lease management <=> checkpoint issue Initialize the get records cache with the shard position, and pass that configuration to the data fetcher. --- .../kinesis/checkpoint/CheckpointFactory.java | 4 ++-- .../checkpoint/DynamoDBCheckpointFactory.java | 11 ++--------- .../amazon/kinesis/coordinator/Scheduler.java | 16 +++++++++++----- .../amazon/kinesis/leases/LeaseRenewer.java | 3 ++- .../kinesis/lifecycle/ConsumerStates.java | 3 ++- .../kinesis/lifecycle/InitializeTask.java | 9 +++++++++ .../retrieval/BlockingGetRecordsCache.java | 8 ++++++-- .../kinesis/retrieval/GetRecordsCache.java | 4 +++- .../retrieval/PrefetchGetRecordsCache.java | 8 ++++++-- .../retrieval/RecordsFetcherFactory.java | 10 +++++----- .../kinesis/retrieval/RetrievalConfig.java | 3 ++- .../kinesis/retrieval/RetrievalFactory.java | 3 ++- .../SynchronousBlockingRetrievalFactory.java | 7 +++++-- .../kinesis/coordinator/SchedulerTest.java | 5 +++-- .../kinesis/lifecycle/ShardConsumerTest.java | 4 ++-- ...refetchGetRecordsCacheIntegrationTest.java | 16 +++++++++++----- .../PrefetchGetRecordsCacheTest.java | 19 +++++++++++++------ 17 files changed, 86 insertions(+), 47 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java index 6eab45d0..693119ba 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointFactory.java @@ -15,12 +15,12 @@ package software.amazon.kinesis.checkpoint; -import software.amazon.kinesis.leases.ILeaseManager; +import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.processor.ICheckpoint; /** * */ public interface CheckpointFactory { - ICheckpoint createCheckpoint(); + ICheckpoint createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java index 6c5b46f0..0cbfdaa9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DynamoDBCheckpointFactory.java @@ -40,14 +40,7 @@ public class DynamoDBCheckpointFactory implements CheckpointFactory { private final IMetricsFactory metricsFactory; @Override - public ICheckpoint createCheckpoint() { - return new KinesisClientLibLeaseCoordinator(leaseManager, - workerIdentifier, - failoverTimeMillis, - epsilonMillis, - maxLeasesForWorker, - maxLeasesToStealAtOneTime, - maxLeaseRenewalThreads, - metricsFactory); + public ICheckpoint createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator) { + return leaseCoordinator; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index effe8de4..835f0e76 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -37,7 +37,6 @@ import lombok.Getter; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.leases.KinesisClientLease; @@ -144,12 +143,19 @@ public class Scheduler implements Runnable { this.retrievalConfig = retrievalConfig; this.applicationName = this.coordinatorConfig.applicationName(); - this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpoint(); + + this.leaseCoordinator = + this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator(); + + // + // TODO: Figure out what to do with lease manage <=> checkpoint relationship + // + this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpoint(this.leaseCoordinator); + this.idleTimeInMilliseconds = this.retrievalConfig.idleTimeBetweenReadsInMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); - this.leaseCoordinator = - this.leaseManagementConfig.leaseManagementFactory().createKinesisClientLibLeaseCoordinator(); + this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory().createShardSyncTaskManager(); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); @@ -527,7 +533,7 @@ public class Scheduler implements Runnable { streamName, leaseManager, executorService, - retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo), + retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory), processorFactory.createRecordProcessor(), checkpoint, coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java index aa33ca0f..330ec30e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java @@ -168,7 +168,8 @@ public class LeaseRenewer implements ILeaseRenewer { // Don't renew expired lease during regular renewals. getCopyOfHeldLease may have returned null // triggering the application processing to treat this as a lost lease (fail checkpoint with // ShutdownException). - if (renewEvenIfExpired || (!lease.isExpired(leaseDurationNanos, System.nanoTime()))) { + boolean isLeaseExpired = lease.isExpired(leaseDurationNanos, System.nanoTime()); + if (renewEvenIfExpired || !isLeaseExpired) { renewedLease = leaseManager.renewLease(lease); } if (renewedLease) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index fe85029b..ba8bb9d0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -257,7 +257,8 @@ class ConsumerStates { return new InitializeTask(consumer.shardInfo(), consumer.recordProcessor(), consumer.checkpoint(), - consumer.recordProcessorCheckpointer(), + consumer.recordProcessorCheckpointer(), consumer.initialPositionInStream(), + consumer.getRecordsCache(), consumer.taskBackoffTimeMillis()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index 67662d2d..85983f17 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.lifecycle; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import lombok.NonNull; import lombok.RequiredArgsConstructor; import software.amazon.kinesis.coordinator.RecordProcessorCheckpointer; @@ -21,6 +22,7 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IRecordProcessor; import software.amazon.kinesis.checkpoint.Checkpoint; +import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.MetricsLevel; @@ -43,6 +45,11 @@ public class InitializeTask implements ITask { private final ICheckpoint checkpoint; @NonNull private final RecordProcessorCheckpointer recordProcessorCheckpointer; + @NonNull + private final InitialPositionInStreamExtended initialPositionInStream; + @NonNull + private final GetRecordsCache cache; + // Back off for this interval if we encounter a problem (exception) private final long backoffTimeMillis; @@ -64,6 +71,8 @@ public class InitializeTask implements ITask { Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId()); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); + cache.start(initialCheckpoint, initialPositionInStream); + recordProcessorCheckpointer.largestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java index aef56945..6a4ead27 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BlockingGetRecordsCache.java @@ -15,9 +15,12 @@ package software.amazon.kinesis.retrieval; -import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + /** * This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the * GetRecordsRetrievalStrategy class. @@ -33,7 +36,8 @@ public class BlockingGetRecordsCache implements GetRecordsCache { } @Override - public void start() { + public void start(ExtendedSequenceNumber extendedSequenceNumber, + InitialPositionInStreamExtended initialPositionInStreamExtended) { // // Nothing to do here // diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java index 1f0ff240..bde18dcf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsCache.java @@ -15,7 +15,9 @@ package software.amazon.kinesis.retrieval; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * This class is used as a cache for Prefetching data from Kinesis. @@ -24,7 +26,7 @@ public interface GetRecordsCache { /** * This method calls the start behavior on the cache, if available. */ - void start(); + void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended); /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java index 9b971ae3..acdf94c0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCache.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.apache.commons.lang.Validate; import com.amazonaws.SdkClientException; @@ -34,6 +35,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the @@ -105,10 +107,12 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } @Override - public void start() { + public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { if (executorService.isShutdown()) { throw new IllegalStateException("ExecutorService has been shutdown."); } + + dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended); if (!started) { log.info("Starting prefetching thread."); @@ -150,7 +154,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void addDataArrivedListener(DataArrivedListener dataArrivedListener) { - if (dataArrivedListener != null) { + if (this.dataArrivedListener != null) { log.warn("Attempting to reset the data arrived listener for {}. This shouldn't happen", shardId); } this.dataArrivedListener = dataArrivedListener; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java index 5e1ab1a8..e5a0bfc7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java @@ -39,7 +39,7 @@ public interface RecordsFetcherFactory { /** * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are * blocked. - * + * * @param maxPendingProcessRecordsInput The maximum number of ProcessRecordsInput objects that the cache will accept * before blocking. */ @@ -48,7 +48,7 @@ public interface RecordsFetcherFactory { /** * Sets the max byte size for the GetRecordsCache, before further requests are blocked. The byte size of the cache * is the sum of byte size of all the ProcessRecordsInput objects in the cache at any point of time. - * + * * @param maxByteSize The maximum byte size for the cache before blocking. */ void setMaxByteSize(int maxByteSize); @@ -57,21 +57,21 @@ public interface RecordsFetcherFactory { * Sets the max number of records for the GetRecordsCache can hold, before further requests are blocked. The records * count is the sum of all records present in across all the ProcessRecordsInput objects in the cache at any point * of time. - * + * * @param maxRecordsCount The mximum number of records in the cache before blocking. */ void setMaxRecordsCount(int maxRecordsCount); /** * Sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. - * + * * @param dataFetchingStrategy Fetching strategy to be used */ void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); /** * Sets the maximum idle time between two get calls. - * + * * @param idleMillisBetweenCalls Sleep millis between calls. */ void setIdleMillisBetweenCalls(long idleMillisBetweenCalls); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index fd6377c0..d92c8701 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -19,8 +19,8 @@ import java.util.Optional; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; + import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; @@ -119,6 +119,7 @@ public class RetrievalConfig { public RetrievalFactory retrievalFactory() { if (retrievalFactory == null) { retrievalFactory = new SynchronousBlockingRetrievalFactory(streamName(), amazonKinesis(), + recordsFetcherFactory, listShardsBackoffTimeInMillis(), maxListShardsRetryAttempts(), maxRecords()); } return retrievalFactory; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java index 76d85fcb..8c74edf3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.metrics.IMetricsFactory; /** * @@ -23,5 +24,5 @@ import software.amazon.kinesis.leases.ShardInfo; public interface RetrievalFactory { GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo); - GetRecordsCache createGetRecordsCache(ShardInfo shardInfo); + GetRecordsCache createGetRecordsCache(ShardInfo shardInfo, IMetricsFactory metricsFactory); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java index b3a77ad6..8307159a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/SynchronousBlockingRetrievalFactory.java @@ -19,6 +19,7 @@ import com.amazonaws.services.kinesis.AmazonKinesis; import lombok.Data; import lombok.NonNull; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.metrics.IMetricsFactory; /** * @@ -34,6 +35,8 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { private final String streamName; @NonNull private final AmazonKinesis amazonKinesis; + private final RecordsFetcherFactory recordsFetcherFactory; + private final long listShardsBackoffTimeInMillis; private final int maxListShardsRetryAttempts; private final int maxRecords; @@ -45,7 +48,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { } @Override - public GetRecordsCache createGetRecordsCache(@NonNull final ShardInfo shardInfo) { - throw new UnsupportedOperationException(); + public GetRecordsCache createGetRecordsCache(@NonNull final ShardInfo shardInfo, IMetricsFactory metricsFactory) { + return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo), shardInfo.shardId(), metricsFactory, maxRecords); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 55cfd358..dbed7d6c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -65,6 +65,7 @@ import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShutdownInput; import software.amazon.kinesis.lifecycle.ShutdownReason; +import software.amazon.kinesis.metrics.IMetricsFactory; import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IRecordProcessor; @@ -131,7 +132,7 @@ public class SchedulerTest { when(leaseCoordinator.leaseManager()).thenReturn(leaseManager); when(shardSyncTaskManager.leaseManagerProxy()).thenReturn(leaseManagerProxy); - when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class))).thenReturn(getRecordsCache); + when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(IMetricsFactory.class))).thenReturn(getRecordsCache); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); @@ -457,7 +458,7 @@ public class SchedulerTest { private class TestKinesisCheckpointFactory implements CheckpointFactory { @Override - public ICheckpoint createCheckpoint() { + public ICheckpoint createCheckpoint(KinesisClientLibLeaseCoordinator leaseCoordinator) { return checkpoint; } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index f9702a1b..3a7a9c3b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -308,7 +308,7 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); - verify(getRecordsCache).start(); + verify(getRecordsCache).start(any(ExtendedSequenceNumber.class), any(InitialPositionInStreamExtended.class)); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { @@ -410,7 +410,7 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); - verify(getRecordsCache).start(); + verify(getRecordsCache).start(any(ExtendedSequenceNumber.class), any(InitialPositionInStreamExtended.class)); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java index bd0ade14..35313fcc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -50,6 +51,7 @@ import com.amazonaws.services.kinesis.model.Record; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * These are the integration tests for the PrefetchGetRecordsCache class. @@ -74,6 +76,10 @@ public class PrefetchGetRecordsCacheIntegrationTest { @Mock private AmazonKinesis amazonKinesis; + @Mock + private ExtendedSequenceNumber extendedSequenceNumber; + @Mock + private InitialPositionInStreamExtended initialPosition; @Before public void setup() { @@ -95,7 +101,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { @Test public void testRollingCache() { - getRecordsCache.start(); + getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult(); @@ -111,7 +117,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { @Test public void testFullCache() { - getRecordsCache.start(); + getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS); assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE); @@ -141,7 +147,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { operation, "test-shard-2"); - getRecordsCache.start(); + getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); final Record record = mock(Record.class); @@ -152,7 +158,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { records.add(record); records.add(record); records.add(record); - getRecordsCache2.start(); + getRecordsCache2.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); @@ -181,7 +187,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { }).thenCallRealMethod(); doNothing().when(dataFetcher).restartIterator(); - getRecordsCache.start(); + getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheTest.java index 40283b3e..2308cc4e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.IntStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -52,6 +53,7 @@ import com.amazonaws.services.kinesis.model.Record; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.PrefetchGetRecordsCache; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * Test class for the PrefetchGetRecordsCache class. @@ -73,6 +75,10 @@ public class PrefetchGetRecordsCacheTest { private Record record; @Mock private KinesisDataFetcher dataFetcher; + @Mock + private InitialPositionInStreamExtended initialPosition; + @Mock + private ExtendedSequenceNumber sequenceNumber; private List records; private ExecutorService executorService; @@ -80,6 +86,7 @@ public class PrefetchGetRecordsCacheTest { private PrefetchGetRecordsCache getRecordsCache; private String operation = "ProcessTask"; + @Before public void setup() { when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher); @@ -114,7 +121,7 @@ public class PrefetchGetRecordsCacheTest { records.add(record); records.add(record); - getRecordsCache.start(); + getRecordsCache.start(sequenceNumber, initialPosition); ProcessRecordsInput result = getRecordsCache.getNextResult(); assertEquals(result.getRecords(), records); @@ -130,7 +137,7 @@ public class PrefetchGetRecordsCacheTest { records.add(record); - getRecordsCache.start(); + getRecordsCache.start(sequenceNumber, initialPosition); // Sleep for a few seconds for the cache to fill up. sleep(2000); @@ -144,7 +151,7 @@ public class PrefetchGetRecordsCacheTest { int recordsSize = 4500; when(records.size()).thenReturn(recordsSize); - getRecordsCache.start(); + getRecordsCache.start(sequenceNumber, initialPosition); sleep(2000); @@ -159,7 +166,7 @@ public class PrefetchGetRecordsCacheTest { int recordsSize = 200; when(records.size()).thenReturn(recordsSize); - getRecordsCache.start(); + getRecordsCache.start(sequenceNumber, initialPosition); // Sleep for a few seconds for the cache to fill up. sleep(2000); @@ -175,7 +182,7 @@ public class PrefetchGetRecordsCacheTest { IntStream.range(0, recordsSize).forEach(i -> records.add(record)); - getRecordsCache.start(); + getRecordsCache.start(sequenceNumber, initialPosition); ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); verify(executorService).execute(any()); @@ -207,7 +214,7 @@ public class PrefetchGetRecordsCacheTest { @Test public void testExpiredIteratorException() { - getRecordsCache.start(); + getRecordsCache.start(sequenceNumber, initialPosition); when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class).thenReturn(getRecordsResult); doNothing().when(dataFetcher).restartIterator();