diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 6648b919..e27e8d26 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -74,6 +74,7 @@ public class KinesisDataFetcherTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000)); + ; /** * @throws java.lang.Exception @@ -306,6 +307,82 @@ public class KinesisDataFetcherTest { return noAcceptResult; } + @Test + public void testFetcherDoesNotAdvanceWithoutAccept() { + final String INITIAL_ITERATOR = "InitialIterator"; + final String NEXT_ITERATOR_ONE = "NextIteratorOne"; + final String NEXT_ITERATOR_TWO = "NextIteratorTwo"; + when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR); + GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class); + when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE); + when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults); + + GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class); + when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults); + when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO); + + GetRecordsResult finalResult = mock(GetRecordsResult.class); + when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult); + when(finalResult.getNextShardIterator()).thenReturn(null); + + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO); + dataFetcher.initialize("TRIM_HORIZON", InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); + + assertNoAdvance(dataFetcher, iteratorOneResults, INITIAL_ITERATOR); + assertAdvanced(dataFetcher, iteratorOneResults, INITIAL_ITERATOR, NEXT_ITERATOR_ONE); + + assertNoAdvance(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE); + assertAdvanced(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE, NEXT_ITERATOR_TWO); + + assertNoAdvance(dataFetcher, finalResult, NEXT_ITERATOR_TWO); + assertAdvanced(dataFetcher, finalResult, NEXT_ITERATOR_TWO, null); + + verify(kinesisProxy, times(2)).get(eq(INITIAL_ITERATOR), anyInt()); + verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_ONE), anyInt()); + verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_TWO), anyInt()); + + reset(kinesisProxy); + + DataFetcherResult terminal = dataFetcher.getRecords(100); + assertThat(terminal.isShardEnd(), equalTo(true)); + assertThat(terminal.getResult(), nullValue()); + assertThat(terminal, equalTo(dataFetcher.TERMINAL_RESULT)); + + verify(kinesisProxy, never()).get(anyString(), anyInt()); + } + + + private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue, String nextValue) { + DataFetcherResult acceptResult = dataFetcher.getRecords(100); + assertThat(acceptResult.getResult(), equalTo(expectedResult)); + + assertThat(dataFetcher.getNextIterator(), equalTo(previousValue)); + assertThat(dataFetcher.isShardEndReached(), equalTo(false)); + + assertThat(acceptResult.accept(), equalTo(expectedResult)); + assertThat(dataFetcher.getNextIterator(), equalTo(nextValue)); + if (nextValue == null) { + assertThat(dataFetcher.isShardEndReached(), equalTo(true)); + } + + verify(kinesisProxy, times(2)).get(eq(previousValue), anyInt()); + + return acceptResult; + } + + private DataFetcherResult assertNoAdvance(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue) { + assertThat(dataFetcher.getNextIterator(), equalTo(previousValue)); + DataFetcherResult noAcceptResult = dataFetcher.getRecords(100); + assertThat(noAcceptResult.getResult(), equalTo(expectedResult)); + + assertThat(dataFetcher.getNextIterator(), equalTo(previousValue)); + + verify(kinesisProxy).get(eq(previousValue), anyInt()); + + return noAcceptResult; + } + private void testInitializeAndFetch(String iteratorType, String seqNo, InitialPositionInStreamExtended initialPositionInStream) throws Exception {