Merge Release 1.8.5 (#233)

* Only advance the shard iterator when we accept a result to return

This changes the retriever strategy to only accept the shard iterator
when we have accepted a result to return.  This is for the
asynchronous retriever where multiple threads may contend for the same
iterator slot.  This ensures only the one selected for the response will
advance the shard iterator.

* Release 1.8.5 of the Amazon Kinesis Client for Java (#232)

* Release 1.8.5 of the Amazon Kinesis Client for Java

Release 1.8.5 (September 26, 2017)
* Only advance the shard iterator for the accepted response.  
  This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests.  The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`.
  * PR #230
  * Issue #231

* Change the TerminalResult to return an empty GetRecordsResult

Changes the TerminalResult to provide an empty GetRecordsResult, which
components downstream depend on.

* Fix unit test for behavior change of TerminalResult

Got distracted, and forgot to run the unit tests.
This commit is contained in:
Justin Pfifer 2017-09-27 10:31:04 -07:00 committed by Sahil Palvia
parent c96c8afeac
commit 060913d49c
12 changed files with 252 additions and 82 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.8.4
Bundle-Version: 1.8.5
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -29,6 +29,12 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes
### Release 1.8.5 (September 26, 2017)
* Only advance the shard iterator for the accepted response.
This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests. The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`.
* [PR #230](https://github.com/awslabs/amazon-kinesis-client/pull/230)
* [Issue #231](https://github.com/awslabs/amazon-kinesis-client/issues/231)
### Release 1.8.4 (September 22, 2017)
* Create a new completion service for each request.
This ensures that canceled tasks are discarded. This will prevent a cancellation exception causing issues processing records.

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.8.5-SNAPSHOT</version>
<version>1.8.6-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>

View file

@ -49,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private final ExecutorService executorService;
private final int retryGetRecordsInSeconds;
private final String shardId;
final Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
final Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
@ -63,7 +63,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
int retryGetRecordsInSeconds, Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier,
int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher;
this.executorService = executorService;
@ -78,9 +78,9 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
throw new IllegalStateException("Strategy has been shutdown");
}
GetRecordsResult result = null;
CompletionService<GetRecordsResult> completionService = completionServiceSupplier.get();
Set<Future<GetRecordsResult>> futures = new HashSet<>();
Callable<GetRecordsResult> retrieverCall = createRetrieverCallable(maxRecords);
CompletionService<DataFetcherResult> completionService = completionServiceSupplier.get();
Set<Future<DataFetcherResult>> futures = new HashSet<>();
Callable<DataFetcherResult> retrieverCall = createRetrieverCallable(maxRecords);
while (true) {
try {
futures.add(completionService.submit(retrieverCall));
@ -89,10 +89,15 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
try {
Future<GetRecordsResult> resultFuture = completionService.poll(retryGetRecordsInSeconds,
Future<DataFetcherResult> resultFuture = completionService.poll(retryGetRecordsInSeconds,
TimeUnit.SECONDS);
if (resultFuture != null) {
result = resultFuture.get();
//
// Fix to ensure that we only let the shard iterator advance when we intend to return the result
// to the caller. This ensures that the shard iterator is consistently advance in step with
// what the caller sees.
//
result = resultFuture.get().accept();
break;
}
} catch (ExecutionException e) {
@ -106,7 +111,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
return result;
}
private Callable<GetRecordsResult> createRetrieverCallable(int maxRecords) {
private Callable<DataFetcherResult> createRetrieverCallable(int maxRecords) {
ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope());
return () -> {
try {

View file

@ -0,0 +1,37 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License
* (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
* http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
/**
* Represents the result from the DataFetcher, and allows the receiver to accept a result
*/
public interface DataFetcherResult {
/**
* The result of the request to Kinesis
*
* @return The result of the request, this can be null if the request failed.
*/
GetRecordsResult getResult();
/**
* Accepts the result, and advances the shard iterator. A result from the data fetcher must be accepted before any
* further progress can be made.
*
* @return the result of the request, this can be null if the request failed.
*/
GetRecordsResult accept();
/**
* Indicates whether this result is at the end of the shard or not
*
* @return true if the result is at the end of a shard, false otherwise
*/
boolean isShardEnd();
}

View file

@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.4";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.5";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -14,7 +14,6 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
@ -30,6 +29,8 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import lombok.Data;
/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/
@ -62,34 +63,65 @@ class KinesisDataFetcher {
* @param maxRecords Max records to fetch
* @return list of records of up to maxRecords size
*/
public GetRecordsResult getRecords(int maxRecords) {
public DataFetcherResult getRecords(int maxRecords) {
if (!isInitialized) {
throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization.");
}
GetRecordsResult response = null;
if (nextIterator != null) {
try {
sleepBeforeNextCall();
response = kinesisProxy.get(nextIterator, maxRecords);
lastResponseTime = Instant.now();
nextIterator = response.getNextShardIterator();
return new AdvancingResult(kinesisProxy.get(nextIterator, maxRecords));
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId);
nextIterator = null;
return TERMINAL_RESULT;
}
} else {
return TERMINAL_RESULT;
}
}
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
@Override
public GetRecordsResult getResult() {
return new GetRecordsResult().withMillisBehindLatest(null).withRecords(Collections.emptyList())
.withNextShardIterator(null);
}
@Override
public GetRecordsResult accept() {
isShardEndReached = true;
return getResult();
}
@Override
public boolean isShardEnd() {
return isShardEndReached;
}
};
@Data
class AdvancingResult implements DataFetcherResult {
final GetRecordsResult result;
@Override
public GetRecordsResult getResult() {
return result;
}
@Override
public GetRecordsResult accept() {
nextIterator = result.getNextShardIterator();
if (nextIterator == null) {
isShardEndReached = true;
}
} else {
isShardEndReached = true;
return getResult();
}
if (response == null) {
response = new GetRecordsResult().withRecords(Collections.emptyList());
@Override
public boolean isShardEnd() {
return isShardEndReached;
}
return response;
}
/**
@ -189,19 +221,6 @@ class KinesisDataFetcher {
return iterator;
}
protected void sleepBeforeNextCall() {
if (lastResponseTime != null) {
long timeDiff = Duration.between(lastResponseTime, Instant.now()).abs().toMillis();
if (timeDiff < idleMillisBetweenCalls) {
try {
Thread.sleep(idleMillisBetweenCalls - timeDiff);
} catch (InterruptedException e) {
LOG.info("Thread interrupted, shutdown possibly called.");
}
}
}
}
/**
* @return the shardEndReached
*/

View file

@ -28,7 +28,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
@Override
public GetRecordsResult getRecords(final int maxRecords) {
return dataFetcher.getRecords(maxRecords);
return dataFetcher.getRecords(maxRecords).accept();
}
@Override

View file

@ -36,7 +36,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@ -58,20 +61,21 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock
private IKinesisProxy mockKinesisProxy;
@Mock
private ShardInfo mockShardInfo;
@Mock
private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
@Mock
private KinesisClientLibConfiguration configuration;
@Mock
private Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
@Mock
private DataFetcherResult result;
@Mock
private GetRecordsResult recordsResult;
private CompletionService<GetRecordsResult> completionService;
private CompletionService<DataFetcherResult> completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher;
private GetRecordsResult result;
private ExecutorService executorService;
private RejectedExecutionHandler rejectedExecutionHandler;
private int numberOfRecords = 10;
@ -89,16 +93,15 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
new LinkedBlockingQueue<>(1),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(),
rejectedExecutionHandler));
completionService = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
completionService = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService);
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(
dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
result = null;
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
when(result.accept()).thenReturn(recordsResult);
}
@Test
public void oneRequestMultithreadTest() {
when(result.accept()).thenReturn(null);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(eq(numberOfRecords));
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
@ -107,26 +110,24 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Test
public void multiRequestTest() {
result = mock(GetRecordsResult.class);
ExecutorCompletionService<GetRecordsResult> completionService1 = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
ExecutorCompletionService<DataFetcherResult> completionService1 = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords);
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertEquals(result, getRecordsResult);
assertThat(getRecordsResult, equalTo(recordsResult));
result = null;
ExecutorCompletionService<GetRecordsResult> completionService2 = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
when(result.accept()).thenReturn(null);
ExecutorCompletionService<DataFetcherResult> completionService2 = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertNull(getRecordsResult);
assertThat(getRecordsResult, nullValue(GetRecordsResult.class));
}
@Test
@Ignore
public void testInterrupted() throws InterruptedException, ExecutionException {
Future<GetRecordsResult> mockFuture = mock(Future.class);
Future<DataFetcherResult> mockFuture = mock(Future.class);
when(completionService.submit(any())).thenReturn(mockFuture);
when(completionService.poll()).thenReturn(mockFuture);
doThrow(InterruptedException.class).when(mockFuture).get();
@ -159,7 +160,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
}
@Override
public GetRecordsResult getRecords(final int maxRecords) {
public DataFetcherResult getRecords(final int maxRecords) {
try {
Thread.sleep(SLEEP_GET_RECORDS_IN_SECONDS * 1000);
} catch (InterruptedException e) {

View file

@ -53,19 +53,23 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private ExecutorService executorService;
@Mock
private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
private Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
@Mock
private CompletionService<GetRecordsResult> completionService;
private CompletionService<DataFetcherResult> completionService;
@Mock
private Future<GetRecordsResult> successfulFuture;
private Future<DataFetcherResult> successfulFuture;
@Mock
private Future<GetRecordsResult> blockedFuture;
private Future<DataFetcherResult> blockedFuture;
@Mock
private DataFetcherResult dataFetcherResult;
@Mock
private GetRecordsResult expectedResults;
@Before
public void before() {
when(completionServiceSupplier.get()).thenReturn(completionService);
when(dataFetcherResult.getResult()).thenReturn(expectedResults);
when(dataFetcherResult.accept()).thenReturn(expectedResults);
}
@Test
@ -76,7 +80,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(expectedResults);
when(successfulFuture.get()).thenReturn(dataFetcherResult);
GetRecordsResult result = strategy.getRecords(10);
@ -97,7 +101,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(expectedResults);
when(successfulFuture.get()).thenReturn(dataFetcherResult);
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
when(successfulFuture.isCancelled()).thenReturn(false);
@ -133,7 +137,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(null).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(expectedResults);
when(successfulFuture.get()).thenReturn(dataFetcherResult);
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
when(successfulFuture.isCancelled()).thenReturn(false);

View file

@ -14,18 +14,36 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
@ -39,9 +57,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
/**
* Unit tests for KinesisDataFetcher.
@ -49,6 +64,11 @@ import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class KinesisDataFetcherTest {
@Mock
private KinesisProxy kinesisProxy;
@Mock
private KinesisClientLibConfiguration configuration;
private static final int MAX_RECORDS = 1;
private static final String SHARD_ID = "shardId-1";
private static final String AT_SEQUENCE_NUMBER = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
@ -60,9 +80,6 @@ public class KinesisDataFetcherTest {
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
@Mock
private KinesisClientLibConfiguration configuration;
/**
* @throws java.lang.Exception
*/
@ -212,10 +229,90 @@ public class KinesisDataFetcherTest {
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords);
DataFetcherResult dataFetcherResult = dataFetcher.getRecords(maxRecords);
Assert.assertNotNull(getRecordsResult);
Assert.assertTrue(getRecordsResult.getRecords().isEmpty());
assertThat(dataFetcherResult, notNullValue());
}
@Test
public void testFetcherDoesNotAdvanceWithoutAccept() {
final String INITIAL_ITERATOR = "InitialIterator";
final String NEXT_ITERATOR_ONE = "NextIteratorOne";
final String NEXT_ITERATOR_TWO = "NextIteratorTwo";
when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR);
GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class);
when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE);
when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults);
GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class);
when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults);
when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO);
GetRecordsResult finalResult = mock(GetRecordsResult.class);
when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
when(finalResult.getNextShardIterator()).thenReturn(null);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO, configuration);
dataFetcher.initialize("TRIM_HORIZON",
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
assertNoAdvance(dataFetcher, iteratorOneResults, INITIAL_ITERATOR);
assertAdvanced(dataFetcher, iteratorOneResults, INITIAL_ITERATOR, NEXT_ITERATOR_ONE);
assertNoAdvance(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE);
assertAdvanced(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE, NEXT_ITERATOR_TWO);
assertNoAdvance(dataFetcher, finalResult, NEXT_ITERATOR_TWO);
assertAdvanced(dataFetcher, finalResult, NEXT_ITERATOR_TWO, null);
verify(kinesisProxy, times(2)).get(eq(INITIAL_ITERATOR), anyInt());
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_ONE), anyInt());
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_TWO), anyInt());
reset(kinesisProxy);
DataFetcherResult terminal = dataFetcher.getRecords(100);
assertThat(terminal.isShardEnd(), equalTo(true));
assertThat(terminal.getResult(), notNullValue());
GetRecordsResult terminalResult = terminal.getResult();
assertThat(terminalResult.getRecords(), notNullValue());
assertThat(terminalResult.getRecords(), empty());
assertThat(terminalResult.getNextShardIterator(), nullValue());
assertThat(terminal, equalTo(dataFetcher.TERMINAL_RESULT));
verify(kinesisProxy, never()).get(anyString(), anyInt());
}
private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult,
String previousValue, String nextValue) {
DataFetcherResult acceptResult = dataFetcher.getRecords(100);
assertThat(acceptResult.getResult(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
assertThat(dataFetcher.isShardEndReached(), equalTo(false));
assertThat(acceptResult.accept(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(nextValue));
if (nextValue == null) {
assertThat(dataFetcher.isShardEndReached(), equalTo(true));
}
verify(kinesisProxy, times(2)).get(eq(previousValue), anyInt());
return acceptResult;
}
private DataFetcherResult assertNoAdvance(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult,
String previousValue) {
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
DataFetcherResult noAcceptResult = dataFetcher.getRecords(100);
assertThat(noAcceptResult.getResult(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
verify(kinesisProxy).get(eq(previousValue), anyInt());
return noAcceptResult;
}
private void testInitializeAndFetch(String iteratorType,

View file

@ -181,14 +181,15 @@ public class PrefetchGetRecordsCacheIntegrationTest {
}
@Override
public GetRecordsResult getRecords(final int maxRecords) {
public DataFetcherResult getRecords(final int maxRecords) {
GetRecordsResult getRecordsResult = new GetRecordsResult();
getRecordsResult.setRecords(new ArrayList<>(records));
getRecordsResult.setMillisBehindLatest(1000L);
sleepBeforeNextCall();
return getRecordsResult;
return new AdvancingResult(getRecordsResult);
}
}
}