diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index c80c9860..ef752f1b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -230,12 +230,12 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { if (executorService.isShutdown()) { throw new IllegalStateException("ExecutorService has been shutdown."); } - - publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); - if (!started) { - log.info("{} : Starting prefetching thread.", streamAndShardId); + log.info("{} : Starting Prefetching thread and initializing publisher session.", streamAndShardId); + publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); executorService.execute(defaultGetRecordsCacheDaemon); + } else { + log.info("{} : Skipping publisher start as it was already started.", streamAndShardId); } started = true; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 281d738c..f12e2310 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -31,10 +31,12 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied; import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable; import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcessRecordsInput; @@ -47,6 +49,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,6 +60,7 @@ import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -145,6 +149,45 @@ public class PrefetchRecordsPublisherTest { when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse); } + @Test + public void testDataFetcherIsNotReInitializedOnMultipleCacheStarts() { + getRecordsCache.start(sequenceNumber, initialPosition); + getRecordsCache.start(sequenceNumber, initialPosition); + getRecordsCache.start(sequenceNumber, initialPosition); + verify(dataFetcher, times(1)).initialize(any(ExtendedSequenceNumber.class), any()); + } + + @Test + public void testPrefetchPublisherInternalStateNotModifiedWhenPrefetcherThreadStartFails() { + doThrow(new RejectedExecutionException()).doThrow(new RejectedExecutionException()).doCallRealMethod() + .when(executorService).execute(any()); + // Initialize try 1 + tryPrefetchCacheStart(); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + verifyInternalState(0); + // Initialize try 2 + tryPrefetchCacheStart(); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + verifyInternalState(0); + // Initialize try 3 + tryPrefetchCacheStart(); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + verifyInternalState(MAX_SIZE); + verify(dataFetcher, times(3)).initialize(any(ExtendedSequenceNumber.class), any()); + } + + private void tryPrefetchCacheStart() { + try { + getRecordsCache.start(sequenceNumber, initialPosition); + } catch (Exception e) { + // suppress exception + } + } + + private void verifyInternalState(int queueSize) { + Assert.assertTrue(getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == queueSize); + } + @Test public void testGetRecords() { record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java index fa10557f..0d68e51b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java @@ -35,4 +35,17 @@ public class BlockingUtils { throw new RuntimeException("No records found"); } } + + public static boolean blockUntilConditionSatisfied(Supplier conditionSupplier, long timeoutMillis) { + while(!conditionSupplier.get() && timeoutMillis > 0 ) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + timeoutMillis -= 100; + } + return conditionSupplier.get(); + } + }