Merge pull request #63 from ashwing/ltr_1_duplicate_recseq_fix

Fixing Prefetch publisher cache restart issue
This commit is contained in:
ashwing 2020-06-23 17:36:16 -07:00 committed by GitHub
commit 98d723b576
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 4 deletions

View file

@ -230,12 +230,12 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
if (executorService.isShutdown()) {
throw new IllegalStateException("ExecutorService has been shutdown.");
}
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
if (!started) {
log.info("{} : Starting prefetching thread.", streamAndShardId);
log.info("{} : Starting Prefetching thread and initializing publisher session.", streamAndShardId);
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
executorService.execute(defaultGetRecordsCacheDaemon);
} else {
log.info("{} : Skipping publisher start as it was already started.", streamAndShardId);
}
started = true;
}

View file

@ -31,10 +31,12 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied;
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable;
import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcessRecordsInput;
@ -47,6 +49,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -57,6 +60,7 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -145,6 +149,45 @@ public class PrefetchRecordsPublisherTest {
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse);
}
@Test
public void testDataFetcherIsNotReInitializedOnMultipleCacheStarts() {
getRecordsCache.start(sequenceNumber, initialPosition);
getRecordsCache.start(sequenceNumber, initialPosition);
getRecordsCache.start(sequenceNumber, initialPosition);
verify(dataFetcher, times(1)).initialize(any(ExtendedSequenceNumber.class), any());
}
@Test
public void testPrefetchPublisherInternalStateNotModifiedWhenPrefetcherThreadStartFails() {
doThrow(new RejectedExecutionException()).doThrow(new RejectedExecutionException()).doCallRealMethod()
.when(executorService).execute(any());
// Initialize try 1
tryPrefetchCacheStart();
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
verifyInternalState(0);
// Initialize try 2
tryPrefetchCacheStart();
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
verifyInternalState(0);
// Initialize try 3
tryPrefetchCacheStart();
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
verifyInternalState(MAX_SIZE);
verify(dataFetcher, times(3)).initialize(any(ExtendedSequenceNumber.class), any());
}
private void tryPrefetchCacheStart() {
try {
getRecordsCache.start(sequenceNumber, initialPosition);
} catch (Exception e) {
// suppress exception
}
}
private void verifyInternalState(int queueSize) {
Assert.assertTrue(getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == queueSize);
}
@Test
public void testGetRecords() {
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();

View file

@ -35,4 +35,17 @@ public class BlockingUtils {
throw new RuntimeException("No records found");
}
}
public static boolean blockUntilConditionSatisfied(Supplier<Boolean> conditionSupplier, long timeoutMillis) {
while(!conditionSupplier.get() && timeoutMillis > 0 ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
timeoutMillis -= 100;
}
return conditionSupplier.get();
}
}