diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 9370994b..708f6e59 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -414,11 +414,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { makeRetrievalAttempt(); } catch(PositionResetException pre) { log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId); + } catch (Throwable e) { + log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + + " Please search for the exception/error online to check what is going on. If the " + + "issue persists or is a recurring problem, feel free to open an issue on, " + + "https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e); } finally { resetLock.readLock().unlock(); } - - } callShutdownOnStrategy(); } @@ -469,11 +472,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e); } catch (SdkException e) { log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e); - } catch (Throwable e) { - log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + - " Please search for the exception/error online to check what is going on. If the " + - "issue persists or is a recurring problem, feel free to open an issue on, " + - "https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e); } finally { MetricsUtil.endScope(scope); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index 461fce71..f57a7a3a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -24,10 +24,13 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied; import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable; import java.util.ArrayList; @@ -38,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -49,7 +53,6 @@ import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import lombok.extern.slf4j.Slf4j; @@ -224,6 +227,25 @@ public class PrefetchRecordsPublisherIntegrationTest { verify(dataFetcher).restartIterator(); } + @Test + public void testExpiredIteratorExceptionWithInnerRestartIteratorException() { + when(dataFetcher.getRecords()) + .thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build()) + .thenCallRealMethod() + .thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build()) + .thenCallRealMethod(); + + doThrow(IllegalStateException.class).when(dataFetcher).restartIterator(); + + getRecordsCache.start(extendedSequenceNumber, initialPosition); + + final boolean conditionSatisfied = blockUntilConditionSatisfied(() -> + getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 5000); + Assert.assertTrue(conditionSatisfied); + // Asserts the exception was only thrown once for restartIterator + verify(dataFetcher, times(2)).restartIterator(); + } + private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) { return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index b51b08df..f2500867 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -448,6 +448,26 @@ public class PrefetchRecordsPublisherTest { verify(dataFetcher).restartIterator(); } + @Test + public void testExpiredIteratorExceptionWithIllegalStateException() { + // This test validates that the daemon thread doesn't die when ExpiredIteratorException occurs with an + // IllegalStateException. + + when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)) + .thenThrow(ExpiredIteratorException.builder().build()) + .thenReturn(getRecordsResponse) + .thenThrow(ExpiredIteratorException.builder().build()) + .thenReturn(getRecordsResponse); + + doThrow(new IllegalStateException()).when(dataFetcher).restartIterator(); + + getRecordsCache.start(sequenceNumber, initialPosition); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + + // verify restartIterator was called + verify(dataFetcher, times(2)).restartIterator(); + } + @Test public void testRetryableRetrievalExceptionContinues() {