Added acquire timeout message, and a test. (#514)

The test doesn't verify the message, but does verify that an acquire
timeout triggers the FanOutRecordsPublisher to call logAcquireTimeoutMessage.
This commit is contained in:
Justin Pfifer 2019-03-06 13:18:15 -08:00 committed by Sahil Palvia
parent 610295eab4
commit 6685a924d5
2 changed files with 154 additions and 16 deletions

View file

@ -157,14 +157,21 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
String logMessage = String.format( String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s", "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString); shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString);
if (category.throwableType.equals(ThrowableType.READ_TIMEOUT)) { switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable); log.debug(logMessage, propagationThrowable);
propagationThrowable = new RetryableRetrievalException(category.throwableTypeString, propagationThrowable = new RetryableRetrievalException(category.throwableTypeString,
(Exception) propagationThrowable.getCause()); (Exception) propagationThrowable.getCause());
} else { break;
case ACQUIRE_TIMEOUT:
logAcquireTimeoutMessage(t);
//
// Fall through is intentional here as we still want to log the details of the exception
//
default:
log.warn(logMessage, propagationThrowable); log.warn(logMessage, propagationThrowable);
}
}
flow.cancel(); flow.cancel();
} }
log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace);
@ -190,6 +197,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} }
} }
protected void logAcquireTimeoutMessage(Throwable t) {
log.error("An acquire timeout occurred which usually indicates that the KinesisAsyncClient supplied has a " +
"low maximum streams limit. " +
"Please use the software.amazon.kinesis.common.KinesisClientUtil to setup the client, " +
"or refer to the class to setup the client manually.");
}
private void handleFlowError(Throwable t) { private void handleFlowError(Throwable t) {
if (t.getCause() instanceof ResourceNotFoundException) { if (t.getCause() instanceof ResourceNotFoundException) {
log.debug( log.debug(
@ -197,8 +211,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
shardId); shardId);
FanoutRecordsRetrieved response = new FanoutRecordsRetrieved( FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(
ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null); ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null);
subscriber subscriber.onNext(response);
.onNext(response);
subscriber.onComplete(); subscriber.onComplete();
} else { } else {
subscriber.onError(t); subscriber.onError(t);
@ -280,7 +293,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
ProcessRecordsInput input = ProcessRecordsInput.builder().cacheEntryTime(Instant.now()) ProcessRecordsInput input = ProcessRecordsInput.builder().cacheEntryTime(Instant.now())
.millisBehindLatest(recordBatchEvent.millisBehindLatest()) .millisBehindLatest(recordBatchEvent.millisBehindLatest())
.isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build(); .isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build();
FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, recordBatchEvent.continuationSequenceNumber()); FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input,
recordBatchEvent.continuationSequenceNumber());
try { try {
subscriber.onNext(recordsRetrieved); subscriber.onNext(recordsRetrieved);
@ -416,7 +430,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} }
subscriber = null; subscriber = null;
if (flow != null) { if (flow != null) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId); shardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel(); flow.cancel();
availableQueueSpace = 0; availableQueueSpace = 0;

View file

@ -14,11 +14,14 @@ import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.netty.handler.timeout.ReadTimeoutException;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeDiagnosingMatcher;
@ -30,9 +33,12 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import io.netty.handler.timeout.ReadTimeoutException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
@ -71,13 +77,15 @@ public class FanOutRecordsPublisherTest {
public void simpleTest() throws Exception { public void simpleTest() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class); ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class); .forClass(FanOutRecordsPublisher.RecordFlow.class);
doNothing().when(publisher).subscribe(captor.capture()); doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>(); List<ProcessRecordsInput> receivedInput = new ArrayList<>();
@ -138,13 +146,15 @@ public class FanOutRecordsPublisherTest {
public void largeRequestTest() throws Exception { public void largeRequestTest() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class); ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class); .forClass(FanOutRecordsPublisher.RecordFlow.class);
doNothing().when(publisher).subscribe(captor.capture()); doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>(); List<ProcessRecordsInput> receivedInput = new ArrayList<>();
@ -288,9 +298,8 @@ public class FanOutRecordsPublisherTest {
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> nextFlowCaptor = ArgumentCaptor ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> nextFlowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class); .forClass(FanOutRecordsPublisher.RecordFlow.class);
SubscribeToShardRequest nextExpected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN)
SubscribeToShardRequest nextExpected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN).shardId(SHARD_ID) .shardId(SHARD_ID).startingPosition(StartingPosition.builder().sequenceNumber("3")
.startingPosition(StartingPosition.builder().sequenceNumber("3")
.type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build()) .type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build())
.build(); .build();
@ -301,7 +310,6 @@ public class FanOutRecordsPublisherTest {
nextFlowCaptor.getValue().onEventStream(publisher); nextFlowCaptor.getValue().onEventStream(publisher);
nextSubscribeCaptor.getValue().onSubscribe(subscription); nextSubscribeCaptor.getValue().onSubscribe(subscription);
List<Record> nextRecords = Stream.of(4, 5, 6).map(this::makeRecord).collect(Collectors.toList()); List<Record> nextRecords = Stream.of(4, 5, 6).map(this::makeRecord).collect(Collectors.toList());
List<KinesisClientRecordMatcher> nextMatchers = nextRecords.stream().map(KinesisClientRecordMatcher::new) List<KinesisClientRecordMatcher> nextMatchers = nextRecords.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -319,6 +327,60 @@ public class FanOutRecordsPublisherTest {
} }
@Test
public void acquireTimeoutTriggersLogMethodForActiveFlow() {
AtomicBoolean acquireTimeoutLogged = new AtomicBoolean(false);
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN) {
@Override
protected void logAcquireTimeoutMessage(Throwable t) {
super.logAcquireTimeoutMessage(t);
acquireTimeoutLogged.set(true);
}
};
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
RecordingSubscriber subscriber = new RecordingSubscriber();
source.subscribe(subscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
Throwable exception = new CompletionException(
"software.amazon.awssdk.core.exception.SdkClientException",
SdkClientException.create(null, new Throwable(
"Acquire operation took longer than the configured maximum time. This indicates that a " +
"request cannot get a connection from the pool within the specified maximum time. " +
"This can be due to high request rate.\n" +
"Consider taking any of the following actions to mitigate the issue: increase max " +
"connections, increase acquire timeout, or slowing the request rate.\n" +
"Increasing the max connections can increase client throughput (unless the network " +
"interface is already fully utilized), but can eventually start to hit operation " +
"system limitations on the number of file descriptors used by the process. " +
"If you already are fully utilizing your network interface or cannot further " +
"increase your connection count, increasing the acquire timeout gives extra time " +
"for requests to acquire a connection before timing out. " +
"If the connections doesn't free up, the subsequent requests will still timeout.\n" +
"If the above mechanisms are not able to fix the issue, try smoothing out your " +
"requests so that large traffic bursts cannot overload the client, being more " +
"efficient with the number of times you need to call AWS, or by increasing the " +
"number of hosts sending requests.")));
flowCaptor.getValue().exceptionOccurred(exception);
Optional<OnErrorEvent> onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent).map(e -> (OnErrorEvent)e).findFirst();
assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
assertThat(acquireTimeoutLogged.get(), equalTo(true));
}
private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) { private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) {
assertThat(clientRecordsList.size(), equalTo(matchers.size())); assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) { for (int i = 0; i < clientRecordsList.size(); ++i) {
@ -326,6 +388,67 @@ public class FanOutRecordsPublisherTest {
} }
} }
private interface SubscriberEvent {
}
@Data
private static class SubscribeEvent implements SubscriberEvent {
final Subscription subscription;
}
@Data
private static class OnNextEvent implements SubscriberEvent {
final RecordsRetrieved recordsRetrieved;
}
@Data
private static class OnErrorEvent implements SubscriberEvent {
final Throwable throwable;
}
@Data
private static class OnCompleteEvent implements SubscriberEvent {
}
@Data
private static class RequestEvent implements SubscriberEvent {
final long requested;
}
private static class RecordingSubscriber implements Subscriber<RecordsRetrieved> {
final List<SubscriberEvent> events = new LinkedList<>();
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
events.add(new SubscribeEvent(s));
subscription = s;
subscription.request(1);
events.add(new RequestEvent(1));
}
@Override
public void onNext(RecordsRetrieved recordsRetrieved) {
events.add(new OnNextEvent(recordsRetrieved));
subscription.request(1);
events.add(new RequestEvent(1));
}
@Override
public void onError(Throwable t) {
events.add(new OnErrorEvent(t));
}
@Override
public void onComplete() {
events.add(new OnCompleteEvent());
}
}
private static class NonFailingSubscriber implements Subscriber<RecordsRetrieved> { private static class NonFailingSubscriber implements Subscriber<RecordsRetrieved> {
final List<ProcessRecordsInput> received = new ArrayList<>(); final List<ProcessRecordsInput> received = new ArrayList<>();
Subscription subscription; Subscription subscription;