Updating branch to master (#244)

This commit is contained in:
Sahil Palvia 2017-10-17 10:39:57 -07:00 committed by GitHub
parent 816b55ffb0
commit efef97b802

View file

@ -74,7 +74,6 @@ public class KinesisDataFetcherTest {
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
;
/**
* @throws java.lang.Exception
@ -307,82 +306,6 @@ public class KinesisDataFetcherTest {
return noAcceptResult;
}
@Test
public void testFetcherDoesNotAdvanceWithoutAccept() {
final String INITIAL_ITERATOR = "InitialIterator";
final String NEXT_ITERATOR_ONE = "NextIteratorOne";
final String NEXT_ITERATOR_TWO = "NextIteratorTwo";
when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR);
GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class);
when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE);
when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults);
GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class);
when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults);
when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO);
GetRecordsResult finalResult = mock(GetRecordsResult.class);
when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
when(finalResult.getNextShardIterator()).thenReturn(null);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
dataFetcher.initialize("TRIM_HORIZON", InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
assertNoAdvance(dataFetcher, iteratorOneResults, INITIAL_ITERATOR);
assertAdvanced(dataFetcher, iteratorOneResults, INITIAL_ITERATOR, NEXT_ITERATOR_ONE);
assertNoAdvance(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE);
assertAdvanced(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE, NEXT_ITERATOR_TWO);
assertNoAdvance(dataFetcher, finalResult, NEXT_ITERATOR_TWO);
assertAdvanced(dataFetcher, finalResult, NEXT_ITERATOR_TWO, null);
verify(kinesisProxy, times(2)).get(eq(INITIAL_ITERATOR), anyInt());
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_ONE), anyInt());
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_TWO), anyInt());
reset(kinesisProxy);
DataFetcherResult terminal = dataFetcher.getRecords(100);
assertThat(terminal.isShardEnd(), equalTo(true));
assertThat(terminal.getResult(), nullValue());
assertThat(terminal, equalTo(dataFetcher.TERMINAL_RESULT));
verify(kinesisProxy, never()).get(anyString(), anyInt());
}
private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue, String nextValue) {
DataFetcherResult acceptResult = dataFetcher.getRecords(100);
assertThat(acceptResult.getResult(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
assertThat(dataFetcher.isShardEndReached(), equalTo(false));
assertThat(acceptResult.accept(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(nextValue));
if (nextValue == null) {
assertThat(dataFetcher.isShardEndReached(), equalTo(true));
}
verify(kinesisProxy, times(2)).get(eq(previousValue), anyInt());
return acceptResult;
}
private DataFetcherResult assertNoAdvance(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue) {
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
DataFetcherResult noAcceptResult = dataFetcher.getRecords(100);
assertThat(noAcceptResult.getResult(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
verify(kinesisProxy).get(eq(previousValue), anyInt());
return noAcceptResult;
}
private void testInitializeAndFetch(String iteratorType,
String seqNo,
InitialPositionInStreamExtended initialPositionInStream) throws Exception {