From 6685a924d56e12f83c112d0f1e8c03726796554f Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Wed, 6 Mar 2019 13:18:15 -0800 Subject: [PATCH] Added acquire timeout message, and a test. (#514) The test doesn't verify the message, but does verify that an acquire timeout triggers the FanOutRecordsPublisher to call logAcquireTimeoutMessage. --- .../fanout/FanOutRecordsPublisher.java | 29 +++- .../fanout/FanOutRecordsPublisherTest.java | 141 ++++++++++++++++-- 2 files changed, 154 insertions(+), 16 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 638becf1..703c3a3b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -157,14 +157,21 @@ public class FanOutRecordsPublisher implements RecordsPublisher { String logMessage = String.format( "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s", shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString); - if (category.throwableType.equals(ThrowableType.READ_TIMEOUT)) { + switch (category.throwableType) { + case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); propagationThrowable = new RetryableRetrievalException(category.throwableTypeString, (Exception) propagationThrowable.getCause()); - } else { + break; + case ACQUIRE_TIMEOUT: + logAcquireTimeoutMessage(t); + // + // Fall through is intentional here as we still want to log the details of the exception + // + default: log.warn(logMessage, propagationThrowable); - } + } flow.cancel(); } log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); @@ -190,6 +197,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + protected void logAcquireTimeoutMessage(Throwable t) { + log.error("An acquire timeout occurred which usually indicates that the KinesisAsyncClient supplied has a " + + "low maximum streams limit. " + + "Please use the software.amazon.kinesis.common.KinesisClientUtil to setup the client, " + + "or refer to the class to setup the client manually."); + } + private void handleFlowError(Throwable t) { if (t.getCause() instanceof ResourceNotFoundException) { log.debug( @@ -197,8 +211,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { shardId); FanoutRecordsRetrieved response = new FanoutRecordsRetrieved( ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null); - subscriber - .onNext(response); + subscriber.onNext(response); subscriber.onComplete(); } else { subscriber.onError(t); @@ -280,7 +293,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { ProcessRecordsInput input = ProcessRecordsInput.builder().cacheEntryTime(Instant.now()) .millisBehindLatest(recordBatchEvent.millisBehindLatest()) .isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build(); - FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, recordBatchEvent.continuationSequenceNumber()); + FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, + recordBatchEvent.continuationSequenceNumber()); try { subscriber.onNext(recordsRetrieved); @@ -416,7 +430,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } subscriber = null; if (flow != null) { - log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", + log.debug( + "{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); availableQueueSpace = 0; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 18ea93b5..b12fc1a5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -14,11 +14,14 @@ import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; -import io.netty.handler.timeout.ReadTimeoutException; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -30,9 +33,12 @@ import org.mockito.runners.MockitoJUnitRunner; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import io.netty.handler.timeout.ReadTimeoutException; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; @@ -71,13 +77,15 @@ public class FanOutRecordsPublisherTest { public void simpleTest() throws Exception { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); - ArgumentCaptor captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); ArgumentCaptor flowCaptor = ArgumentCaptor .forClass(FanOutRecordsPublisher.RecordFlow.class); doNothing().when(publisher).subscribe(captor.capture()); - source.start(ExtendedSequenceNumber.LATEST, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); List receivedInput = new ArrayList<>(); @@ -138,13 +146,15 @@ public class FanOutRecordsPublisherTest { public void largeRequestTest() throws Exception { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); - ArgumentCaptor captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); ArgumentCaptor flowCaptor = ArgumentCaptor .forClass(FanOutRecordsPublisher.RecordFlow.class); doNothing().when(publisher).subscribe(captor.capture()); - source.start(ExtendedSequenceNumber.LATEST, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); List receivedInput = new ArrayList<>(); @@ -288,9 +298,8 @@ public class FanOutRecordsPublisherTest { ArgumentCaptor nextFlowCaptor = ArgumentCaptor .forClass(FanOutRecordsPublisher.RecordFlow.class); - - SubscribeToShardRequest nextExpected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN).shardId(SHARD_ID) - .startingPosition(StartingPosition.builder().sequenceNumber("3") + SubscribeToShardRequest nextExpected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN) + .shardId(SHARD_ID).startingPosition(StartingPosition.builder().sequenceNumber("3") .type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build()) .build(); @@ -301,7 +310,6 @@ public class FanOutRecordsPublisherTest { nextFlowCaptor.getValue().onEventStream(publisher); nextSubscribeCaptor.getValue().onSubscribe(subscription); - List nextRecords = Stream.of(4, 5, 6).map(this::makeRecord).collect(Collectors.toList()); List nextMatchers = nextRecords.stream().map(KinesisClientRecordMatcher::new) .collect(Collectors.toList()); @@ -319,6 +327,60 @@ public class FanOutRecordsPublisherTest { } + @Test + public void acquireTimeoutTriggersLogMethodForActiveFlow() { + AtomicBoolean acquireTimeoutLogged = new AtomicBoolean(false); + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN) { + @Override + protected void logAcquireTimeoutMessage(Throwable t) { + super.logAcquireTimeoutMessage(t); + acquireTimeoutLogged.set(true); + } + }; + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + RecordingSubscriber subscriber = new RecordingSubscriber(); + source.subscribe(subscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + + Throwable exception = new CompletionException( + "software.amazon.awssdk.core.exception.SdkClientException", + SdkClientException.create(null, new Throwable( + "Acquire operation took longer than the configured maximum time. This indicates that a " + + "request cannot get a connection from the pool within the specified maximum time. " + + "This can be due to high request rate.\n" + + "Consider taking any of the following actions to mitigate the issue: increase max " + + "connections, increase acquire timeout, or slowing the request rate.\n" + + "Increasing the max connections can increase client throughput (unless the network " + + "interface is already fully utilized), but can eventually start to hit operation " + + "system limitations on the number of file descriptors used by the process. " + + "If you already are fully utilizing your network interface or cannot further " + + "increase your connection count, increasing the acquire timeout gives extra time " + + "for requests to acquire a connection before timing out. " + + "If the connections doesn't free up, the subsequent requests will still timeout.\n" + + "If the above mechanisms are not able to fix the issue, try smoothing out your " + + "requests so that large traffic bursts cannot overload the client, being more " + + "efficient with the number of times you need to call AWS, or by increasing the " + + "number of hosts sending requests."))); + + flowCaptor.getValue().exceptionOccurred(exception); + + Optional onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent).map(e -> (OnErrorEvent)e).findFirst(); + + assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception)))); + assertThat(acquireTimeoutLogged.get(), equalTo(true)); + + } + private void verifyRecords(List clientRecordsList, List matchers) { assertThat(clientRecordsList.size(), equalTo(matchers.size())); for (int i = 0; i < clientRecordsList.size(); ++i) { @@ -326,6 +388,67 @@ public class FanOutRecordsPublisherTest { } } + private interface SubscriberEvent { + + } + + @Data + private static class SubscribeEvent implements SubscriberEvent { + final Subscription subscription; + } + + @Data + private static class OnNextEvent implements SubscriberEvent { + final RecordsRetrieved recordsRetrieved; + } + + @Data + private static class OnErrorEvent implements SubscriberEvent { + final Throwable throwable; + } + + @Data + private static class OnCompleteEvent implements SubscriberEvent { + + } + + @Data + private static class RequestEvent implements SubscriberEvent { + final long requested; + } + + private static class RecordingSubscriber implements Subscriber { + + final List events = new LinkedList<>(); + + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + events.add(new SubscribeEvent(s)); + subscription = s; + subscription.request(1); + events.add(new RequestEvent(1)); + } + + @Override + public void onNext(RecordsRetrieved recordsRetrieved) { + events.add(new OnNextEvent(recordsRetrieved)); + subscription.request(1); + events.add(new RequestEvent(1)); + } + + @Override + public void onError(Throwable t) { + events.add(new OnErrorEvent(t)); + } + + @Override + public void onComplete() { + events.add(new OnCompleteEvent()); + } + } + private static class NonFailingSubscriber implements Subscriber { final List received = new ArrayList<>(); Subscription subscription;