Fixing bug in PrefetchRecordsPublisher which was causing retry storms if initial requests fail due to no wait time between get calls.

This commit is contained in:
Yatin 2020-12-08 04:49:01 -08:00
parent da3b11153f
commit 2d70002258
2 changed files with 99 additions and 14 deletions

View file

@ -86,6 +86,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final MetricsFactory metricsFactory;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private boolean isFirstGetCallTry = true;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private boolean started = false;
private final String operation;
@ -489,7 +490,13 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
}
private void sleepBeforeNextCall() throws InterruptedException {
if (lastSuccessfulCall == null) {
if (lastSuccessfulCall == null && isFirstGetCallTry) {
isFirstGetCallTry = false;
return;
}
// Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm
if(lastSuccessfulCall == null) {
Thread.sleep(idleMillisBetweenCalls);
return;
}
long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();

View file

@ -32,6 +32,7 @@ import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -76,6 +77,7 @@ import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@ -198,7 +200,8 @@ public class PrefetchRecordsPublisherTest {
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
getRecordsCache.start(sequenceNumber, initialPosition);
ProcessRecordsInput result = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L)
ProcessRecordsInput result = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache,
"shardId"), 1000L)
.processRecordsInput();
assertEquals(expectedRecords, result.records());
@ -208,6 +211,81 @@ public class PrefetchRecordsPublisherTest {
verify(getRecordsRetrievalStrategy, atLeast(1)).getRecords(eq(MAX_RECORDS_PER_CALL));
}
@Test(expected = RuntimeException.class)
public void testGetRecordsWithInitialFailures_LessThanRequiredWait_Throws() {
// Create a new PrefetchRecordsPublisher with 1s idle time between get calls
getRecordsCache = new PrefetchRecordsPublisher(
MAX_SIZE,
3 * SIZE_1_MB,
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService,
1000,
new NullMetricsFactory(),
operation,
"shardId");
// Setup the retrieval strategy to fail initial calls before succeeding
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new
RetryableRetrievalException("Timed out")).thenThrow(new
RetryableRetrievalException("Timed out again")).thenReturn(getRecordsResponse);
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();
when(records.size()).thenReturn(1000);
final List<KinesisClientRecord> expectedRecords = records.stream()
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
getRecordsCache.start(sequenceNumber, initialPosition);
ProcessRecordsInput result = null;
// Setup timeout to be less than what the PrefetchRecordsPublisher will need based on the idle time between
// get calls to validate exception is thrown
result = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache,
"shardId"), 1000L)
.processRecordsInput();
}
@Test
public void testGetRecordsWithInitialFailures_AdequateWait_Success() {
// Create a new PrefetchRecordsPublisher with 1s idle time between get calls
getRecordsCache = new PrefetchRecordsPublisher(
MAX_SIZE,
3 * SIZE_1_MB,
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService,
1000,
new NullMetricsFactory(),
operation,
"shardId");
// Setup the retrieval strategy to fail initial calls before succeeding
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new
RetryableRetrievalException("Timed out")).thenThrow(new
RetryableRetrievalException("Timed out again")).thenReturn(getRecordsResponse);
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();
when(records.size()).thenReturn(1000);
final List<KinesisClientRecord> expectedRecords = records.stream()
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
getRecordsCache.start(sequenceNumber, initialPosition);
ProcessRecordsInput result = null;
// Setup timeout to be more than what the PrefetchRecordsPublisher will need based on the idle time between
// get calls and then validate the mocks later
result = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache,
"shardId"), 4000L)
.processRecordsInput();
assertEquals(expectedRecords, result.records());
assertEquals(new ArrayList<>(), result.childShards());
verify(executorService).execute(any());
// Validate at least 3 calls were including the 2 failed ones
verify(getRecordsRetrievalStrategy, atLeast(3)).getRecords(eq(MAX_RECORDS_PER_CALL));
}
@Test
public void testGetRecordsWithInvalidResponse() {
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();
@ -238,15 +316,15 @@ public class PrefetchRecordsPublisherTest {
List<String> parentShards = new ArrayList<>();
parentShards.add("shardId");
ChildShard leftChild = ChildShard.builder()
.shardId("shardId-000000000001")
.parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"))
.build();
.shardId("shardId-000000000001")
.parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"))
.build();
ChildShard rightChild = ChildShard.builder()
.shardId("shardId-000000000002")
.parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
.build();
.shardId("shardId-000000000002")
.parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
.build();
childShards.add(leftChild);
childShards.add(rightChild);
@ -292,9 +370,9 @@ public class PrefetchRecordsPublisherTest {
sleep(2000);
int callRate = (int) Math.ceil((double) MAX_RECORDS_COUNT/recordsSize);
// TODO: fix this verification
// verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL);
// assertEquals(spyQueue.size(), callRate);
// TODO: fix this verification
// verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL);
// assertEquals(spyQueue.size(), callRate);
assertTrue("Call Rate is "+callRate,callRate < MAX_SIZE);
}
@ -410,7 +488,7 @@ public class PrefetchRecordsPublisherTest {
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getPublisherSession().prefetchRecordsQueue().size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 10;
Object lock = new Object();