diff --git a/pom.xml b/pom.xml
index f0f783a2..66006321 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.8.4
+ 1.8.5-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
index b592c29b..2e3cbd9e 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
@@ -49,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private final ExecutorService executorService;
private final int retryGetRecordsInSeconds;
private final String shardId;
- final Supplier> completionServiceSupplier;
+ final Supplier> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
@@ -63,7 +63,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
- int retryGetRecordsInSeconds, Supplier> completionServiceSupplier,
+ int retryGetRecordsInSeconds, Supplier> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher;
this.executorService = executorService;
@@ -78,9 +78,9 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
throw new IllegalStateException("Strategy has been shutdown");
}
GetRecordsResult result = null;
- CompletionService completionService = completionServiceSupplier.get();
- Set> futures = new HashSet<>();
- Callable retrieverCall = createRetrieverCallable(maxRecords);
+ CompletionService completionService = completionServiceSupplier.get();
+ Set> futures = new HashSet<>();
+ Callable retrieverCall = createRetrieverCallable(maxRecords);
while (true) {
try {
futures.add(completionService.submit(retrieverCall));
@@ -89,10 +89,15 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
try {
- Future resultFuture = completionService.poll(retryGetRecordsInSeconds,
+ Future resultFuture = completionService.poll(retryGetRecordsInSeconds,
TimeUnit.SECONDS);
if (resultFuture != null) {
- result = resultFuture.get();
+ //
+ // Fix to ensure that we only let the shard iterator advance when we intend to return the result
+ // to the caller. This ensures that the shard iterator is consistently advance in step with
+ // what the caller sees.
+ //
+ result = resultFuture.get().accept();
break;
}
} catch (ExecutionException e) {
@@ -106,7 +111,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
return result;
}
- private Callable createRetrieverCallable(int maxRecords) {
+ private Callable createRetrieverCallable(int maxRecords) {
ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope());
return () -> {
try {
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetcherResult.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetcherResult.java
new file mode 100644
index 00000000..a7121ff2
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetcherResult.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License
+ * (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
+ * http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+
+/**
+ * Represents the result from the DataFetcher, and allows the receiver to accept a result
+ */
+public interface DataFetcherResult {
+ /**
+ * The result of the request to Kinesis
+ *
+ * @return The result of the request, this can be null if the request failed.
+ */
+ GetRecordsResult getResult();
+
+ /**
+ * Accepts the result, and advances the shard iterator. A result from the data fetcher must be accepted before any
+ * further progress can be made.
+ *
+ * @return the result of the request, this can be null if the request failed.
+ */
+ GetRecordsResult accept();
+
+ /**
+ * Indicates whether this result is at the end of the shard or not
+ *
+ * @return true if the result is at the end of a shard, false otherwise
+ */
+ boolean isShardEnd();
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java
index 2ce3152a..dec0ac5e 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java
@@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import lombok.Data;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -26,6 +27,7 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKin
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import java.util.Date;
+import java.util.function.Consumer;
/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
@@ -57,30 +59,69 @@ class KinesisDataFetcher {
* @param maxRecords Max records to fetch
* @return list of records of up to maxRecords size
*/
- public GetRecordsResult getRecords(int maxRecords) {
+ public DataFetcherResult getRecords(int maxRecords) {
if (!isInitialized) {
throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization.");
}
- GetRecordsResult response = null;
+ DataFetcherResult response;
if (nextIterator != null) {
try {
- response = kinesisProxy.get(nextIterator, maxRecords);
- nextIterator = response.getNextShardIterator();
+ response = new AdvancingResult(kinesisProxy.get(nextIterator, maxRecords));
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId);
- nextIterator = null;
- }
- if (nextIterator == null) {
- isShardEndReached = true;
+ response = TERMINAL_RESULT;
}
} else {
- isShardEndReached = true;
+ response = TERMINAL_RESULT;
}
return response;
}
+ final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
+ @Override
+ public GetRecordsResult getResult() {
+ return null;
+ }
+
+ @Override
+ public GetRecordsResult accept() {
+ isShardEndReached = true;
+ return getResult();
+ }
+
+ @Override
+ public boolean isShardEnd() {
+ return isShardEndReached;
+ }
+ };
+
+ @Data
+ private class AdvancingResult implements DataFetcherResult {
+
+ final GetRecordsResult result;
+
+ @Override
+ public GetRecordsResult getResult() {
+ return result;
+ }
+
+ @Override
+ public GetRecordsResult accept() {
+ nextIterator = result.getNextShardIterator();
+ if (nextIterator == null) {
+ isShardEndReached = true;
+ }
+ return getResult();
+ }
+
+ @Override
+ public boolean isShardEnd() {
+ return isShardEndReached;
+ }
+ }
+
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
* @param initialCheckpoint Current checkpoint sequence number for this shard.
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java
index 3c8925b0..c862c348 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java
@@ -28,7 +28,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
@Override
public GetRecordsResult getRecords(final int maxRecords) {
- return dataFetcher.getRecords(maxRecords);
+ return dataFetcher.getRecords(maxRecords).accept();
}
@Override
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
index b3659605..8e89204e 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
@@ -36,7 +36,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import static org.junit.Assert.assertEquals;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -58,17 +61,19 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock
private IKinesisProxy mockKinesisProxy;
-
@Mock
private ShardInfo mockShardInfo;
@Mock
- private Supplier> completionServiceSupplier;
+ private Supplier> completionServiceSupplier;
+ @Mock
+ private DataFetcherResult result;
+ @Mock
+ private GetRecordsResult recordsResult;
- private CompletionService completionService;
+ private CompletionService completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher;
- private GetRecordsResult result;
private ExecutorService executorService;
private RejectedExecutionHandler rejectedExecutionHandler;
private int numberOfRecords = 10;
@@ -86,14 +91,15 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
new LinkedBlockingQueue<>(1),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(),
rejectedExecutionHandler));
- completionService = spy(new ExecutorCompletionService(executorService));
+ completionService = spy(new ExecutorCompletionService(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService);
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
- result = null;
+ when(result.accept()).thenReturn(recordsResult);
}
@Test
public void oneRequestMultithreadTest() {
+ when(result.accept()).thenReturn(null);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(eq(numberOfRecords));
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
@@ -102,27 +108,25 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Test
public void multiRequestTest() {
- result = mock(GetRecordsResult.class);
-
- ExecutorCompletionService completionService1 = spy(new ExecutorCompletionService(executorService));
+ ExecutorCompletionService completionService1 = spy(new ExecutorCompletionService(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords);
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
- assertEquals(result, getRecordsResult);
+ assertThat(getRecordsResult, equalTo(recordsResult));
- result = null;
- ExecutorCompletionService completionService2 = spy(new ExecutorCompletionService(executorService));
+ when(result.accept()).thenReturn(null);
+ ExecutorCompletionService completionService2 = spy(new ExecutorCompletionService(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
- assertNull(getRecordsResult);
+ assertThat(getRecordsResult, nullValue(GetRecordsResult.class));
}
@Test
@Ignore
public void testInterrupted() throws InterruptedException, ExecutionException {
- Future mockFuture = mock(Future.class);
+ Future mockFuture = mock(Future.class);
when(completionService.submit(any())).thenReturn(mockFuture);
when(completionService.poll()).thenReturn(mockFuture);
doThrow(InterruptedException.class).when(mockFuture).get();
@@ -154,7 +158,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
}
@Override
- public GetRecordsResult getRecords(final int maxRecords) {
+ public DataFetcherResult getRecords(final int maxRecords) {
try {
Thread.sleep(SLEEP_GET_RECORDS_IN_SECONDS * 1000);
} catch (InterruptedException e) {
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
index 820f4a57..aa9e9a24 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
@@ -53,19 +53,23 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private ExecutorService executorService;
@Mock
- private Supplier> completionServiceSupplier;
+ private Supplier> completionServiceSupplier;
@Mock
- private CompletionService completionService;
+ private CompletionService completionService;
@Mock
- private Future successfulFuture;
+ private Future successfulFuture;
@Mock
- private Future blockedFuture;
+ private Future blockedFuture;
+ @Mock
+ private DataFetcherResult dataFetcherResult;
@Mock
private GetRecordsResult expectedResults;
@Before
public void before() {
when(completionServiceSupplier.get()).thenReturn(completionService);
+ when(dataFetcherResult.getResult()).thenReturn(expectedResults);
+ when(dataFetcherResult.accept()).thenReturn(expectedResults);
}
@Test
@@ -76,7 +80,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(successfulFuture);
- when(successfulFuture.get()).thenReturn(expectedResults);
+ when(successfulFuture.get()).thenReturn(dataFetcherResult);
GetRecordsResult result = strategy.getRecords(10);
@@ -97,7 +101,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(successfulFuture);
- when(successfulFuture.get()).thenReturn(expectedResults);
+ when(successfulFuture.get()).thenReturn(dataFetcherResult);
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
when(successfulFuture.isCancelled()).thenReturn(false);
@@ -133,7 +137,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(null).thenReturn(successfulFuture);
- when(successfulFuture.get()).thenReturn(expectedResults);
+ when(successfulFuture.get()).thenReturn(dataFetcherResult);
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
when(successfulFuture.isCancelled()).thenReturn(false);
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java
index 2b89c3c8..3cc8cb5a 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java
@@ -14,9 +14,20 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -39,12 +50,19 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
/**
* Unit tests for KinesisDataFetcher.
*/
+@RunWith(MockitoJUnitRunner.class)
public class KinesisDataFetcherTest {
+ @Mock
+ private KinesisProxy kinesisProxy;
+
private static final int MAX_RECORDS = 1;
private static final String SHARD_ID = "shardId-1";
private static final String AT_SEQUENCE_NUMBER = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
@@ -55,6 +73,7 @@ public class KinesisDataFetcherTest {
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
+ ;
/**
* @throws java.lang.Exception
@@ -190,6 +209,82 @@ public class KinesisDataFetcherTest {
Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached());
}
+ @Test
+ public void testFetcherDoesNotAdvanceWithoutAccept() {
+ final String INITIAL_ITERATOR = "InitialIterator";
+ final String NEXT_ITERATOR_ONE = "NextIteratorOne";
+ final String NEXT_ITERATOR_TWO = "NextIteratorTwo";
+ when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR);
+ GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class);
+ when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE);
+ when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults);
+
+ GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class);
+ when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults);
+ when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO);
+
+ GetRecordsResult finalResult = mock(GetRecordsResult.class);
+ when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
+ when(finalResult.getNextShardIterator()).thenReturn(null);
+
+
+ KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
+ dataFetcher.initialize("TRIM_HORIZON", InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
+
+ assertNoAdvance(dataFetcher, iteratorOneResults, INITIAL_ITERATOR);
+ assertAdvanced(dataFetcher, iteratorOneResults, INITIAL_ITERATOR, NEXT_ITERATOR_ONE);
+
+ assertNoAdvance(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE);
+ assertAdvanced(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE, NEXT_ITERATOR_TWO);
+
+ assertNoAdvance(dataFetcher, finalResult, NEXT_ITERATOR_TWO);
+ assertAdvanced(dataFetcher, finalResult, NEXT_ITERATOR_TWO, null);
+
+ verify(kinesisProxy, times(2)).get(eq(INITIAL_ITERATOR), anyInt());
+ verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_ONE), anyInt());
+ verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_TWO), anyInt());
+
+ reset(kinesisProxy);
+
+ DataFetcherResult terminal = dataFetcher.getRecords(100);
+ assertThat(terminal.isShardEnd(), equalTo(true));
+ assertThat(terminal.getResult(), nullValue());
+ assertThat(terminal, equalTo(dataFetcher.TERMINAL_RESULT));
+
+ verify(kinesisProxy, never()).get(anyString(), anyInt());
+ }
+
+
+ private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue, String nextValue) {
+ DataFetcherResult acceptResult = dataFetcher.getRecords(100);
+ assertThat(acceptResult.getResult(), equalTo(expectedResult));
+
+ assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
+ assertThat(dataFetcher.isShardEndReached(), equalTo(false));
+
+ assertThat(acceptResult.accept(), equalTo(expectedResult));
+ assertThat(dataFetcher.getNextIterator(), equalTo(nextValue));
+ if (nextValue == null) {
+ assertThat(dataFetcher.isShardEndReached(), equalTo(true));
+ }
+
+ verify(kinesisProxy, times(2)).get(eq(previousValue), anyInt());
+
+ return acceptResult;
+ }
+
+ private DataFetcherResult assertNoAdvance(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue) {
+ assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
+ DataFetcherResult noAcceptResult = dataFetcher.getRecords(100);
+ assertThat(noAcceptResult.getResult(), equalTo(expectedResult));
+
+ assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
+
+ verify(kinesisProxy).get(eq(previousValue), anyInt());
+
+ return noAcceptResult;
+ }
+
private void testInitializeAndFetch(String iteratorType,
String seqNo,
InitialPositionInStreamExtended initialPositionInStream) throws Exception {