Fixing Prefetch publisher cache restart issue

This commit is contained in:
Ashwin Giridharan 2020-06-22 23:23:41 -07:00
parent db2c22e046
commit d4f3c0b14a
2 changed files with 12 additions and 4 deletions

View file

@ -230,12 +230,12 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
if (executorService.isShutdown()) { if (executorService.isShutdown()) {
throw new IllegalStateException("ExecutorService has been shutdown."); throw new IllegalStateException("ExecutorService has been shutdown.");
} }
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
if (!started) { if (!started) {
log.info("{} : Starting prefetching thread.", streamAndShardId); publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
log.info("{} : Starting prefetching thread and initializing publisher session.", streamAndShardId);
executorService.execute(defaultGetRecordsCacheDaemon); executorService.execute(defaultGetRecordsCacheDaemon);
} else {
log.info("{} : Skipping publisher start as it was already started.", streamAndShardId);
} }
started = true; started = true;
} }

View file

@ -145,6 +145,14 @@ public class PrefetchRecordsPublisherTest {
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse);
} }
@Test
public void testDataFetcherIsNotReInitializedOnMultipleCacheStarts() {
getRecordsCache.start(sequenceNumber, initialPosition);
getRecordsCache.start(sequenceNumber, initialPosition);
getRecordsCache.start(sequenceNumber, initialPosition);
verify(dataFetcher, times(1)).initialize(any(ExtendedSequenceNumber.class), any());
}
@Test @Test
public void testGetRecords() { public void testGetRecords() {
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build(); record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();