Addressed comments

This commit is contained in:
Wei 2017-10-09 15:17:33 -07:00
parent eff10a8253
commit 8055b6379a
7 changed files with 10 additions and 15 deletions

View file

@ -52,12 +52,9 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private PrefetchCounters prefetchCounters;
private boolean started = false;
private String operation;
private final String operation;
/**
* Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a

View file

@ -26,11 +26,10 @@ public interface RecordsFetcherFactory {
* @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache
* @param shardId ShardId of the shard that the fetcher will retrieve records for
* @param metricsFactory MetricsFactory used to create metricScope
* @param operation operation name used to emit metrics
*
* @return GetRecordsCache used to get records from Kinesis.
*/
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory, String operation);
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory);
/**
* Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are

View file

@ -235,7 +235,7 @@ class ShardConsumer {
this.dataFetcher = kinesisDataFetcher;
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), metricsFactory, "ProcessTask");
this.getShardInfo().getShardId(), this.metricsFactory);
}
/**

View file

@ -35,7 +35,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
}
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory, String operation) {
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls);
} else {
@ -45,7 +45,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
.setDaemon(true)
.setNameFormat("prefetch-cache-" + shardId + "-%04d")
.build()),
idleMillisBetweenCalls, metricsFactory, operation);
idleMillisBetweenCalls, metricsFactory, "processTask");
}
}

View file

@ -13,7 +13,6 @@ import org.mockito.MockitoAnnotations;
public class RecordsFetcherFactoryTest {
private String shardId = "TestShard";
private RecordsFetcherFactory recordsFetcherFactory;
private String operation = "ProcessTask";
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@ -30,7 +29,7 @@ public class RecordsFetcherFactoryTest {
@Test
public void createDefaultRecordsFetcherTest() {
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId,
metricsFactory, operation);
metricsFactory);
assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class));
}
@ -38,7 +37,7 @@ public class RecordsFetcherFactoryTest {
public void createPrefetchRecordsFetcherTest() {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId,
metricsFactory, operation);
metricsFactory);
assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class));
}

View file

@ -341,7 +341,7 @@ public class ShardConsumerTest {
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any(),anyString())).thenReturn(getRecordsCache);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
@ -471,7 +471,7 @@ public class ShardConsumerTest {
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any(),anyString())).thenReturn(getRecordsCache);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
ShardConsumer consumer =
new ShardConsumer(shardInfo,

View file

@ -621,7 +621,7 @@ public class WorkerTest {
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any(), anyString())).thenReturn(getRecordsCache);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L));
WorkerThread workerThread = runWorker(shardList,