Merge branch 'master' into prefetch
This commit is contained in:
commit
816b55ffb0
1 changed files with 77 additions and 0 deletions
|
|
@ -74,6 +74,7 @@ public class KinesisDataFetcherTest {
|
||||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
|
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
|
||||||
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
|
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
|
||||||
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
|
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
|
||||||
|
;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws java.lang.Exception
|
* @throws java.lang.Exception
|
||||||
|
|
@ -306,6 +307,82 @@ public class KinesisDataFetcherTest {
|
||||||
return noAcceptResult;
|
return noAcceptResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetcherDoesNotAdvanceWithoutAccept() {
|
||||||
|
final String INITIAL_ITERATOR = "InitialIterator";
|
||||||
|
final String NEXT_ITERATOR_ONE = "NextIteratorOne";
|
||||||
|
final String NEXT_ITERATOR_TWO = "NextIteratorTwo";
|
||||||
|
when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR);
|
||||||
|
GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class);
|
||||||
|
when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE);
|
||||||
|
when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults);
|
||||||
|
|
||||||
|
GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class);
|
||||||
|
when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults);
|
||||||
|
when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO);
|
||||||
|
|
||||||
|
GetRecordsResult finalResult = mock(GetRecordsResult.class);
|
||||||
|
when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
|
||||||
|
when(finalResult.getNextShardIterator()).thenReturn(null);
|
||||||
|
|
||||||
|
|
||||||
|
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
|
||||||
|
dataFetcher.initialize("TRIM_HORIZON", InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
|
||||||
|
|
||||||
|
assertNoAdvance(dataFetcher, iteratorOneResults, INITIAL_ITERATOR);
|
||||||
|
assertAdvanced(dataFetcher, iteratorOneResults, INITIAL_ITERATOR, NEXT_ITERATOR_ONE);
|
||||||
|
|
||||||
|
assertNoAdvance(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE);
|
||||||
|
assertAdvanced(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE, NEXT_ITERATOR_TWO);
|
||||||
|
|
||||||
|
assertNoAdvance(dataFetcher, finalResult, NEXT_ITERATOR_TWO);
|
||||||
|
assertAdvanced(dataFetcher, finalResult, NEXT_ITERATOR_TWO, null);
|
||||||
|
|
||||||
|
verify(kinesisProxy, times(2)).get(eq(INITIAL_ITERATOR), anyInt());
|
||||||
|
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_ONE), anyInt());
|
||||||
|
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_TWO), anyInt());
|
||||||
|
|
||||||
|
reset(kinesisProxy);
|
||||||
|
|
||||||
|
DataFetcherResult terminal = dataFetcher.getRecords(100);
|
||||||
|
assertThat(terminal.isShardEnd(), equalTo(true));
|
||||||
|
assertThat(terminal.getResult(), nullValue());
|
||||||
|
assertThat(terminal, equalTo(dataFetcher.TERMINAL_RESULT));
|
||||||
|
|
||||||
|
verify(kinesisProxy, never()).get(anyString(), anyInt());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue, String nextValue) {
|
||||||
|
DataFetcherResult acceptResult = dataFetcher.getRecords(100);
|
||||||
|
assertThat(acceptResult.getResult(), equalTo(expectedResult));
|
||||||
|
|
||||||
|
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
|
||||||
|
assertThat(dataFetcher.isShardEndReached(), equalTo(false));
|
||||||
|
|
||||||
|
assertThat(acceptResult.accept(), equalTo(expectedResult));
|
||||||
|
assertThat(dataFetcher.getNextIterator(), equalTo(nextValue));
|
||||||
|
if (nextValue == null) {
|
||||||
|
assertThat(dataFetcher.isShardEndReached(), equalTo(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
verify(kinesisProxy, times(2)).get(eq(previousValue), anyInt());
|
||||||
|
|
||||||
|
return acceptResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
private DataFetcherResult assertNoAdvance(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue) {
|
||||||
|
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
|
||||||
|
DataFetcherResult noAcceptResult = dataFetcher.getRecords(100);
|
||||||
|
assertThat(noAcceptResult.getResult(), equalTo(expectedResult));
|
||||||
|
|
||||||
|
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
|
||||||
|
|
||||||
|
verify(kinesisProxy).get(eq(previousValue), anyInt());
|
||||||
|
|
||||||
|
return noAcceptResult;
|
||||||
|
}
|
||||||
|
|
||||||
private void testInitializeAndFetch(String iteratorType,
|
private void testInitializeAndFetch(String iteratorType,
|
||||||
String seqNo,
|
String seqNo,
|
||||||
InitialPositionInStreamExtended initialPositionInStream) throws Exception {
|
InitialPositionInStreamExtended initialPositionInStream) throws Exception {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue