diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsExecutor.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsRetrivalStrategy.java similarity index 81% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsExecutor.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsRetrivalStrategy.java index ba301e0e..4b5a3e64 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsExecutor.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsRetrivalStrategy.java @@ -8,7 +8,7 @@ import lombok.NonNull; * */ @Data -public class DefaultGetRecordsExecutor implements GetRecordsExecutor { +public class DefaultGetRecordsRetrivalStrategy implements GetRecordsRetrivalStrategy { @NonNull private final KinesisDataFetcher dataFetcher; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsExecutor.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrivalStrategy.java similarity index 80% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsExecutor.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrivalStrategy.java index 272611d8..ed3d3f93 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsExecutor.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrivalStrategy.java @@ -5,6 +5,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; /** * */ -public interface GetRecordsExecutor { +public interface GetRecordsRetrivalStrategy { GetRecordsResult getRecords(int maxRecords); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 8ffe7d3f..cb3ddb2e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -62,7 +62,7 @@ class ProcessTask implements ITask { private final Shard shard; private final ThrottlingReporter throttlingReporter; - private final GetRecordsExecutor getRecordsExecutor; + private final GetRecordsRetrivalStrategy getRecordsRetrivalStrategy; /** * @param shardInfo @@ -83,7 +83,7 @@ class ProcessTask implements ITask { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new DefaultGetRecordsExecutor(dataFetcher)); + new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new DefaultGetRecordsRetrivalStrategy(dataFetcher)); } /** @@ -105,7 +105,7 @@ class ProcessTask implements ITask { public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsExecutor getRecordsExecutor) { + ThrottlingReporter throttlingReporter, GetRecordsRetrivalStrategy getRecordsRetrivalStrategy) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -115,7 +115,7 @@ class ProcessTask implements ITask { this.backoffTimeMillis = backoffTimeMillis; this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - this.getRecordsExecutor = getRecordsExecutor; + this.getRecordsRetrivalStrategy = getRecordsRetrivalStrategy; // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if @@ -371,7 +371,7 @@ class ProcessTask implements ITask { * @return list of data records from Kinesis */ private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() { - final GetRecordsResult getRecordsResult = getRecordsExecutor.getRecords(streamConfig.getMaxRecords()); + final GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(streamConfig.getMaxRecords()); if (getRecordsResult == null) { // Stream no longer exists diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 9d610edd..ce5b8949 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -117,7 +117,7 @@ public class KinesisDataFetcherTest { ICheckpoint checkpoint = mock(ICheckpoint.class); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); - GetRecordsExecutor getRecordsExecutor = new DefaultGetRecordsExecutor(fetcher); + GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(fetcher); String iteratorA = "foo"; String iteratorB = "bar"; @@ -139,10 +139,10 @@ public class KinesisDataFetcherTest { fetcher.initialize(seqA, null); fetcher.advanceIteratorTo(seqA, null); - Assert.assertEquals(recordsA, getRecordsExecutor.getRecords(MAX_RECORDS).getRecords()); + Assert.assertEquals(recordsA, getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords()); fetcher.advanceIteratorTo(seqB, null); - Assert.assertEquals(recordsB, getRecordsExecutor.getRecords(MAX_RECORDS).getRecords()); + Assert.assertEquals(recordsB, getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords()); } @Test @@ -182,9 +182,9 @@ public class KinesisDataFetcherTest { // Create data fectcher and initialize it with latest type checkpoint KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); - GetRecordsExecutor getRecordsExecutor = new DefaultGetRecordsExecutor(dataFetcher); + GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(dataFetcher); // Call getRecords of dataFetcher which will throw an exception - getRecordsExecutor.getRecords(maxRecords); + getRecordsRetrivalStrategy.getRecords(maxRecords); // Test shard has reached the end Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached()); @@ -208,9 +208,9 @@ public class KinesisDataFetcherTest { when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); - GetRecordsExecutor getRecordsExecutor = new DefaultGetRecordsExecutor(fetcher); + GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(fetcher); fetcher.initialize(seqNo, initialPositionInStream); - List actualRecords = getRecordsExecutor.getRecords(MAX_RECORDS).getRecords(); + List actualRecords = getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords(); Assert.assertEquals(expectedRecords, actualRecords); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index 890143f6..f704a7c4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -77,7 +77,7 @@ public class ProcessTaskTest { @Mock private ThrottlingReporter throttlingReporter; @Mock - private GetRecordsExecutor mockGetRecordsExecutor; + private GetRecordsRetrivalStrategy mockGetRecordsRetrivalStrategy; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -96,20 +96,20 @@ public class ProcessTaskTest { final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsExecutor); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrivalStrategy); } @Test public void testProcessTaskWithProvisionedThroughputExceededException() { // Set data fetcher to throw exception doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsExecutor) + doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrivalStrategy) .getRecords(maxRecords); TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(mockGetRecordsExecutor).getRecords(eq(maxRecords)); + verify(mockGetRecordsRetrivalStrategy).getRecords(eq(maxRecords)); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @@ -117,10 +117,10 @@ public class ProcessTaskTest { @Test public void testProcessTaskWithNonExistentStream() { // Data fetcher returns a null Result when the stream does not exist - doReturn(null).when(mockGetRecordsExecutor).getRecords(maxRecords); + doReturn(null).when(mockGetRecordsRetrivalStrategy).getRecords(maxRecords); TaskResult result = processTask.call(); - verify(mockGetRecordsExecutor).getRecords(eq(maxRecords)); + verify(mockGetRecordsRetrivalStrategy).getRecords(eq(maxRecords)); assertNull("Task should not throw an exception", result.getException()); } @@ -304,14 +304,14 @@ public class ProcessTaskTest { private void testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(mockGetRecordsExecutor.getRecords(anyInt())).thenReturn( + when(mockGetRecordsRetrivalStrategy.getRecords(anyInt())).thenReturn( new GetRecordsResult().withRecords(records)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(mockGetRecordsExecutor).getRecords(anyInt()); + verify(mockGetRecordsRetrivalStrategy).getRecords(anyInt()); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); processedRecords = priCaptor.getValue().getRecords();