Changing naming as per CR comments.
This commit is contained in:
parent
5d1d38b5a1
commit
2c37bd0b5c
5 changed files with 22 additions and 22 deletions
|
|
@ -8,7 +8,7 @@ import lombok.NonNull;
|
|||
*
|
||||
*/
|
||||
@Data
|
||||
public class DefaultGetRecordsExecutor implements GetRecordsExecutor {
|
||||
public class DefaultGetRecordsRetrivalStrategy implements GetRecordsRetrivalStrategy {
|
||||
@NonNull
|
||||
private final KinesisDataFetcher dataFetcher;
|
||||
|
||||
|
|
@ -5,6 +5,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public interface GetRecordsExecutor {
|
||||
public interface GetRecordsRetrivalStrategy {
|
||||
GetRecordsResult getRecords(int maxRecords);
|
||||
}
|
||||
|
|
@ -62,7 +62,7 @@ class ProcessTask implements ITask {
|
|||
private final Shard shard;
|
||||
private final ThrottlingReporter throttlingReporter;
|
||||
|
||||
private final GetRecordsExecutor getRecordsExecutor;
|
||||
private final GetRecordsRetrivalStrategy getRecordsRetrivalStrategy;
|
||||
|
||||
/**
|
||||
* @param shardInfo
|
||||
|
|
@ -83,7 +83,7 @@ class ProcessTask implements ITask {
|
|||
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
|
||||
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new DefaultGetRecordsExecutor(dataFetcher));
|
||||
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new DefaultGetRecordsRetrivalStrategy(dataFetcher));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -105,7 +105,7 @@ class ProcessTask implements ITask {
|
|||
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
|
||||
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
|
||||
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
ThrottlingReporter throttlingReporter, GetRecordsExecutor getRecordsExecutor) {
|
||||
ThrottlingReporter throttlingReporter, GetRecordsRetrivalStrategy getRecordsRetrivalStrategy) {
|
||||
super();
|
||||
this.shardInfo = shardInfo;
|
||||
this.recordProcessor = recordProcessor;
|
||||
|
|
@ -115,7 +115,7 @@ class ProcessTask implements ITask {
|
|||
this.backoffTimeMillis = backoffTimeMillis;
|
||||
this.throttlingReporter = throttlingReporter;
|
||||
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
|
||||
this.getRecordsExecutor = getRecordsExecutor;
|
||||
this.getRecordsRetrivalStrategy = getRecordsRetrivalStrategy;
|
||||
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
|
||||
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
|
||||
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
|
||||
|
|
@ -371,7 +371,7 @@ class ProcessTask implements ITask {
|
|||
* @return list of data records from Kinesis
|
||||
*/
|
||||
private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() {
|
||||
final GetRecordsResult getRecordsResult = getRecordsExecutor.getRecords(streamConfig.getMaxRecords());
|
||||
final GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(streamConfig.getMaxRecords());
|
||||
|
||||
if (getRecordsResult == null) {
|
||||
// Stream no longer exists
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ public class KinesisDataFetcherTest {
|
|||
ICheckpoint checkpoint = mock(ICheckpoint.class);
|
||||
|
||||
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
|
||||
GetRecordsExecutor getRecordsExecutor = new DefaultGetRecordsExecutor(fetcher);
|
||||
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(fetcher);
|
||||
|
||||
String iteratorA = "foo";
|
||||
String iteratorB = "bar";
|
||||
|
|
@ -139,10 +139,10 @@ public class KinesisDataFetcherTest {
|
|||
fetcher.initialize(seqA, null);
|
||||
|
||||
fetcher.advanceIteratorTo(seqA, null);
|
||||
Assert.assertEquals(recordsA, getRecordsExecutor.getRecords(MAX_RECORDS).getRecords());
|
||||
Assert.assertEquals(recordsA, getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords());
|
||||
|
||||
fetcher.advanceIteratorTo(seqB, null);
|
||||
Assert.assertEquals(recordsB, getRecordsExecutor.getRecords(MAX_RECORDS).getRecords());
|
||||
Assert.assertEquals(recordsB, getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -182,9 +182,9 @@ public class KinesisDataFetcherTest {
|
|||
// Create data fectcher and initialize it with latest type checkpoint
|
||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
|
||||
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
|
||||
GetRecordsExecutor getRecordsExecutor = new DefaultGetRecordsExecutor(dataFetcher);
|
||||
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(dataFetcher);
|
||||
// Call getRecords of dataFetcher which will throw an exception
|
||||
getRecordsExecutor.getRecords(maxRecords);
|
||||
getRecordsRetrivalStrategy.getRecords(maxRecords);
|
||||
|
||||
// Test shard has reached the end
|
||||
Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached());
|
||||
|
|
@ -208,9 +208,9 @@ public class KinesisDataFetcherTest {
|
|||
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
|
||||
|
||||
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
|
||||
GetRecordsExecutor getRecordsExecutor = new DefaultGetRecordsExecutor(fetcher);
|
||||
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(fetcher);
|
||||
fetcher.initialize(seqNo, initialPositionInStream);
|
||||
List<Record> actualRecords = getRecordsExecutor.getRecords(MAX_RECORDS).getRecords();
|
||||
List<Record> actualRecords = getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords();
|
||||
|
||||
Assert.assertEquals(expectedRecords, actualRecords);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ public class ProcessTaskTest {
|
|||
@Mock
|
||||
private ThrottlingReporter throttlingReporter;
|
||||
@Mock
|
||||
private GetRecordsExecutor mockGetRecordsExecutor;
|
||||
private GetRecordsRetrivalStrategy mockGetRecordsRetrivalStrategy;
|
||||
|
||||
private List<Record> processedRecords;
|
||||
private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
|
||||
|
|
@ -96,20 +96,20 @@ public class ProcessTaskTest {
|
|||
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
|
||||
processTask = new ProcessTask(
|
||||
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
|
||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsExecutor);
|
||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrivalStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessTaskWithProvisionedThroughputExceededException() {
|
||||
// Set data fetcher to throw exception
|
||||
doReturn(false).when(mockDataFetcher).isShardEndReached();
|
||||
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsExecutor)
|
||||
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrivalStrategy)
|
||||
.getRecords(maxRecords);
|
||||
|
||||
TaskResult result = processTask.call();
|
||||
verify(throttlingReporter).throttled();
|
||||
verify(throttlingReporter, never()).success();
|
||||
verify(mockGetRecordsExecutor).getRecords(eq(maxRecords));
|
||||
verify(mockGetRecordsRetrivalStrategy).getRecords(eq(maxRecords));
|
||||
assertTrue("Result should contain ProvisionedThroughputExceededException",
|
||||
result.getException() instanceof ProvisionedThroughputExceededException);
|
||||
}
|
||||
|
|
@ -117,10 +117,10 @@ public class ProcessTaskTest {
|
|||
@Test
|
||||
public void testProcessTaskWithNonExistentStream() {
|
||||
// Data fetcher returns a null Result when the stream does not exist
|
||||
doReturn(null).when(mockGetRecordsExecutor).getRecords(maxRecords);
|
||||
doReturn(null).when(mockGetRecordsRetrivalStrategy).getRecords(maxRecords);
|
||||
|
||||
TaskResult result = processTask.call();
|
||||
verify(mockGetRecordsExecutor).getRecords(eq(maxRecords));
|
||||
verify(mockGetRecordsRetrivalStrategy).getRecords(eq(maxRecords));
|
||||
assertNull("Task should not throw an exception", result.getException());
|
||||
}
|
||||
|
||||
|
|
@ -304,14 +304,14 @@ public class ProcessTaskTest {
|
|||
private void testWithRecords(List<Record> records,
|
||||
ExtendedSequenceNumber lastCheckpointValue,
|
||||
ExtendedSequenceNumber largestPermittedCheckpointValue) {
|
||||
when(mockGetRecordsExecutor.getRecords(anyInt())).thenReturn(
|
||||
when(mockGetRecordsRetrivalStrategy.getRecords(anyInt())).thenReturn(
|
||||
new GetRecordsResult().withRecords(records));
|
||||
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
|
||||
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
|
||||
processTask.call();
|
||||
verify(throttlingReporter).success();
|
||||
verify(throttlingReporter, never()).throttled();
|
||||
verify(mockGetRecordsExecutor).getRecords(anyInt());
|
||||
verify(mockGetRecordsRetrivalStrategy).getRecords(anyInt());
|
||||
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
|
||||
verify(mockRecordProcessor).processRecords(priCaptor.capture());
|
||||
processedRecords = priCaptor.getValue().getRecords();
|
||||
|
|
|
|||
Loading…
Reference in a new issue