From d4f3c0b14a0f4af304dca832dc00648bef81213c Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 22 Jun 2020 23:23:41 -0700 Subject: [PATCH] Fixing Prefetch publisher cache restart issue --- .../retrieval/polling/PrefetchRecordsPublisher.java | 8 ++++---- .../retrieval/polling/PrefetchRecordsPublisherTest.java | 8 ++++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index c80c9860..d92c60af 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -230,12 +230,12 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { if (executorService.isShutdown()) { throw new IllegalStateException("ExecutorService has been shutdown."); } - - publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); - if (!started) { - log.info("{} : Starting prefetching thread.", streamAndShardId); + publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); + log.info("{} : Starting prefetching thread and initializing publisher session.", streamAndShardId); executorService.execute(defaultGetRecordsCacheDaemon); + } else { + log.info("{} : Skipping publisher start as it was already started.", streamAndShardId); } started = true; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 281d738c..20a37c26 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -145,6 +145,14 @@ public class PrefetchRecordsPublisherTest { when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse); } + @Test + public void testDataFetcherIsNotReInitializedOnMultipleCacheStarts() { + getRecordsCache.start(sequenceNumber, initialPosition); + getRecordsCache.start(sequenceNumber, initialPosition); + getRecordsCache.start(sequenceNumber, initialPosition); + verify(dataFetcher, times(1)).initialize(any(ExtendedSequenceNumber.class), any()); + } + @Test public void testGetRecords() { record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();