Renaming class to Synchronous from Default
This commit is contained in:
parent
2c37bd0b5c
commit
e2aa89d8f6
3 changed files with 5 additions and 5 deletions
|
|
@ -83,7 +83,7 @@ class ProcessTask implements ITask {
|
||||||
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
|
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
|
||||||
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
|
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
|
||||||
skipShardSyncAtWorkerInitializationIfLeasesExist,
|
skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||||
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new DefaultGetRecordsRetrivalStrategy(dataFetcher));
|
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new SynchronousGetRecordsRetrivalStrategy(dataFetcher));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import lombok.NonNull;
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class DefaultGetRecordsRetrivalStrategy implements GetRecordsRetrivalStrategy {
|
public class SynchronousGetRecordsRetrivalStrategy implements GetRecordsRetrivalStrategy {
|
||||||
@NonNull
|
@NonNull
|
||||||
private final KinesisDataFetcher dataFetcher;
|
private final KinesisDataFetcher dataFetcher;
|
||||||
|
|
||||||
|
|
@ -117,7 +117,7 @@ public class KinesisDataFetcherTest {
|
||||||
ICheckpoint checkpoint = mock(ICheckpoint.class);
|
ICheckpoint checkpoint = mock(ICheckpoint.class);
|
||||||
|
|
||||||
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
|
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
|
||||||
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(fetcher);
|
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new SynchronousGetRecordsRetrivalStrategy(fetcher);
|
||||||
|
|
||||||
String iteratorA = "foo";
|
String iteratorA = "foo";
|
||||||
String iteratorB = "bar";
|
String iteratorB = "bar";
|
||||||
|
|
@ -182,7 +182,7 @@ public class KinesisDataFetcherTest {
|
||||||
// Create data fectcher and initialize it with latest type checkpoint
|
// Create data fectcher and initialize it with latest type checkpoint
|
||||||
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
|
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
|
||||||
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
|
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
|
||||||
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(dataFetcher);
|
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new SynchronousGetRecordsRetrivalStrategy(dataFetcher);
|
||||||
// Call getRecords of dataFetcher which will throw an exception
|
// Call getRecords of dataFetcher which will throw an exception
|
||||||
getRecordsRetrivalStrategy.getRecords(maxRecords);
|
getRecordsRetrivalStrategy.getRecords(maxRecords);
|
||||||
|
|
||||||
|
|
@ -208,7 +208,7 @@ public class KinesisDataFetcherTest {
|
||||||
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
|
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
|
||||||
|
|
||||||
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
|
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
|
||||||
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new DefaultGetRecordsRetrivalStrategy(fetcher);
|
GetRecordsRetrivalStrategy getRecordsRetrivalStrategy = new SynchronousGetRecordsRetrivalStrategy(fetcher);
|
||||||
fetcher.initialize(seqNo, initialPositionInStream);
|
fetcher.initialize(seqNo, initialPositionInStream);
|
||||||
List<Record> actualRecords = getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords();
|
List<Record> actualRecords = getRecordsRetrivalStrategy.getRecords(MAX_RECORDS).getRecords();
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue