From d4f3c0b14a0f4af304dca832dc00648bef81213c Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 22 Jun 2020 23:23:41 -0700 Subject: [PATCH 1/3] Fixing Prefetch publisher cache restart issue --- .../retrieval/polling/PrefetchRecordsPublisher.java | 8 ++++---- .../retrieval/polling/PrefetchRecordsPublisherTest.java | 8 ++++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index c80c9860..d92c60af 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -230,12 +230,12 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { if (executorService.isShutdown()) { throw new IllegalStateException("ExecutorService has been shutdown."); } - - publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); - if (!started) { - log.info("{} : Starting prefetching thread.", streamAndShardId); + publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); + log.info("{} : Starting prefetching thread and initializing publisher session.", streamAndShardId); executorService.execute(defaultGetRecordsCacheDaemon); + } else { + log.info("{} : Skipping publisher start as it was already started.", streamAndShardId); } started = true; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 281d738c..20a37c26 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -145,6 +145,14 @@ public class PrefetchRecordsPublisherTest { when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse); } + @Test + public void testDataFetcherIsNotReInitializedOnMultipleCacheStarts() { + getRecordsCache.start(sequenceNumber, initialPosition); + getRecordsCache.start(sequenceNumber, initialPosition); + getRecordsCache.start(sequenceNumber, initialPosition); + verify(dataFetcher, times(1)).initialize(any(ExtendedSequenceNumber.class), any()); + } + @Test public void testGetRecords() { record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build(); From ef39ecd0df2532bf5772e2d67df195b83b576df9 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 22 Jun 2020 23:37:38 -0700 Subject: [PATCH 2/3] Log changes --- .../kinesis/retrieval/polling/PrefetchRecordsPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index d92c60af..ef752f1b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -231,8 +231,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { throw new IllegalStateException("ExecutorService has been shutdown."); } if (!started) { + log.info("{} : Starting Prefetching thread and initializing publisher session.", streamAndShardId); publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); - log.info("{} : Starting prefetching thread and initializing publisher session.", streamAndShardId); executorService.execute(defaultGetRecordsCacheDaemon); } else { log.info("{} : Skipping publisher start as it was already started.", streamAndShardId); From 60af78f7cbd8fa1adcc5ca2f141c2575b9d26e75 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 23 Jun 2020 14:49:15 -0700 Subject: [PATCH 3/3] Adding unit test case for validating internal state on initial prefetcher failures --- .../polling/PrefetchRecordsPublisherTest.java | 35 +++++++++++++++++++ .../amazon/kinesis/utils/BlockingUtils.java | 13 +++++++ 2 files changed, 48 insertions(+) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 20a37c26..f12e2310 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -31,10 +31,12 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied; import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable; import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcessRecordsInput; @@ -47,6 +49,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,6 +60,7 @@ import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -153,6 +157,37 @@ public class PrefetchRecordsPublisherTest { verify(dataFetcher, times(1)).initialize(any(ExtendedSequenceNumber.class), any()); } + @Test + public void testPrefetchPublisherInternalStateNotModifiedWhenPrefetcherThreadStartFails() { + doThrow(new RejectedExecutionException()).doThrow(new RejectedExecutionException()).doCallRealMethod() + .when(executorService).execute(any()); + // Initialize try 1 + tryPrefetchCacheStart(); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + verifyInternalState(0); + // Initialize try 2 + tryPrefetchCacheStart(); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + verifyInternalState(0); + // Initialize try 3 + tryPrefetchCacheStart(); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + verifyInternalState(MAX_SIZE); + verify(dataFetcher, times(3)).initialize(any(ExtendedSequenceNumber.class), any()); + } + + private void tryPrefetchCacheStart() { + try { + getRecordsCache.start(sequenceNumber, initialPosition); + } catch (Exception e) { + // suppress exception + } + } + + private void verifyInternalState(int queueSize) { + Assert.assertTrue(getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == queueSize); + } + @Test public void testGetRecords() { record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java index fa10557f..0d68e51b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/BlockingUtils.java @@ -35,4 +35,17 @@ public class BlockingUtils { throw new RuntimeException("No records found"); } } + + public static boolean blockUntilConditionSatisfied(Supplier conditionSupplier, long timeoutMillis) { + while(!conditionSupplier.get() && timeoutMillis > 0 ) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + timeoutMillis -= 100; + } + return conditionSupplier.get(); + } + }