Don't Sleep for During Retrieval for the BlockingGetRecordsCache

The BlockingGetRecordsCache shouldn't sleep when retrieving records as backoff is provided in other parts of the ShardConumer.
This commit is contained in:
Sahil Palvia 2017-10-24 09:13:19 -07:00 committed by Justin Pfifer
parent cc7e329e2f
commit 73426bd733
4 changed files with 6 additions and 35 deletions

View file

@ -31,15 +31,11 @@ import lombok.extern.apachecommons.CommonsLog;
public class BlockingGetRecordsCache implements GetRecordsCache { public class BlockingGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall; private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
public BlockingGetRecordsCache(final int maxRecordsPerCall, public BlockingGetRecordsCache(final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
final long idleMillisBetweenCalls) {
this.maxRecordsPerCall = maxRecordsPerCall; this.maxRecordsPerCall = maxRecordsPerCall;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
} }
@Override @Override
@ -51,31 +47,10 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
@Override @Override
public ProcessRecordsInput getNextResult() { public ProcessRecordsInput getNextResult() {
sleepBeforeNextCall();
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now(); return new ProcessRecordsInput()
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords()) .withRecords(getRecordsResult.getRecords())
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
return processRecordsInput;
}
private void sleepBeforeNextCall() {
if (!Thread.interrupted()) {
if (lastSuccessfulCall == null) {
return;
}
long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
if (timeSinceLastCall < idleMillisBetweenCalls) {
try {
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating that shutdown was called.");
}
}
} else {
log.info("Thread has been interrupted, indicating that it is in the shutdown phase.");
}
} }
@Override @Override

View file

@ -28,7 +28,6 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private int maxRecordsCount = 30000; private int maxRecordsCount = 30000;
private long idleMillisBetweenCalls = 1500L; private long idleMillisBetweenCalls = 1500L;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
private IMetricsFactory metricsFactory;
public SimpleRecordsFetcherFactory(int maxRecords) { public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords; this.maxRecords = maxRecords;
@ -37,7 +36,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
@Override @Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else { } else {
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy, getRecordsRetrievalStrategy,

View file

@ -40,7 +40,6 @@ import com.amazonaws.services.kinesis.model.Record;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class BlockingGetRecordsCacheTest { public class BlockingGetRecordsCacheTest {
private static final int MAX_RECORDS_PER_COUNT = 10_000; private static final int MAX_RECORDS_PER_COUNT = 10_000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;
@Mock @Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@ -53,7 +52,7 @@ public class BlockingGetRecordsCacheTest {
@Before @Before
public void setup() { public void setup() {
records = new ArrayList<>(); records = new ArrayList<>();
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS); blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy);
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records); when(getRecordsResult.getRecords()).thenReturn(records);

View file

@ -339,8 +339,7 @@ public class ShardConsumerTest {
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher), new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
0L));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
ShardConsumer consumer = ShardConsumer consumer =
@ -469,8 +468,7 @@ public class ShardConsumerTest {
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher), new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
0L));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
ShardConsumer consumer = ShardConsumer consumer =