diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java index cdb0d386..c32bf0e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java @@ -2,12 +2,54 @@ package software.amazon.kinesis.common; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; -@Accessors(fluent = true) +import java.util.Optional; + +@Accessors(fluent=true) @Getter -@AllArgsConstructor public class RequestDetails { - private final String requestId; - private final String timestamp; + + /** + * Placeholder for logging when no successful request has been made. + */ + private static final String NONE = "NONE"; + + private final Optional requestId; + private final Optional timestamp; + + public RequestDetails() { + this(Optional.empty(), Optional.empty()); + } + + public RequestDetails(Optional requestId, Optional timestamp) { + this.requestId = requestId; + this.timestamp = timestamp; + } + + /** + * Gets last successful response's request id. + * + * @return requestId associated with last succesful response. + */ + public String getLastSuccessfulResponseRequestId() { + return requestId.orElse(NONE); + } + + /** + * Gets last successful response's timestamp. + * + * @return timestamp associated with last successful response. + */ + public String getLastSuccessfulResponseTimestamp() { + return timestamp.orElse(NONE); + } + + @Override + public String toString() { + return String.format("request id - %s, timestamp - %s", getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + } + } + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 3783d726..04aa12d3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber { Duration timeSinceLastResponse = Duration.between(lastRequestTime, now); if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { log.error( - "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response: RequestId - {}, Timestamp - {}", - shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseRequestId(), recordsPublisher.getLastSuccessfulResponseTimestamp()); + "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response details: {}", + shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseDetails()); cancel(); // Start the subscription again which will update the lastRequestTime as well. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index 6634b2fa..b53b8c88 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -28,10 +28,7 @@ import java.util.Optional; */ public interface RecordsPublisher extends Publisher { - /** - * Placeholder for logging when no successful request has been made. - */ - String NONE = "NONE"; + /** * Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will @@ -61,25 +58,7 @@ public interface RecordsPublisher extends Publisher { * * @return details associated with last successful response. */ - Optional getLastSuccessfulResponseDetails(); - - /** - * Gets last successful response's request id. - * - * @return requestId associated with last succesful response. - */ - default String getLastSuccessfulResponseRequestId() { - return getLastSuccessfulResponseDetails().map(RequestDetails::requestId).orElse(NONE); - } - - /** - * Gets last successful response's timestamp. - * - * @return timestamp associated with last successful response. - */ - default String getLastSuccessfulResponseTimestamp() { - return getLastSuccessfulResponseDetails().map(RequestDetails::timestamp).orElse(NONE); - } + RequestDetails getLastSuccessfulResponseDetails(); /** * Notify the publisher on receipt of a data event. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index d9d30108..7b852b03 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -89,7 +89,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>( MAX_EVENT_BURST_FROM_SERVICE); - private Optional lastSuccessfulRequestDetails = Optional.empty(); + private RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -148,12 +148,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } @Override - public Optional getLastSuccessfulResponseDetails() { + public RequestDetails getLastSuccessfulResponseDetails() { return lastSuccessfulRequestDetails; } - public void setLastSuccessfulResponseDetails(RequestDetails requestDetails) { - lastSuccessfulRequestDetails = Optional.of(requestDetails); + private void setLastSuccessfulResponseDetails(RequestDetails requestDetails) { + lastSuccessfulRequestDetails = requestDetails; } // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. @@ -218,8 +218,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } catch (IllegalStateException e) { - log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response: RequestId - {}, Timestamp - {}", - shardId, recordsDeliveryQueue.remainingCapacity(), getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response details: {}", + shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); throw e; } catch (Throwable t) { log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); @@ -303,13 +303,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if(hasValidFlow()) { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + - " Last successful response: RequestId - {}, Timestamp - {}", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + " Last successful response details: {}", shardId, flow.connectionStartedAt, + flow.subscribeToShardId, lastSuccessfulRequestDetails); } else { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." + - " Last successful response: RequestId - {}, Timestamp - {}", - shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + " Last successful response details: {}", shardId, lastSuccessfulRequestDetails); } return; } @@ -321,8 +320,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null) { String logMessage = String.format( "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + - " Last successful response: RequestId - {}, Timestamp - {}", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + " Last successful response details: {}", + shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); switch (category.throwableType) { case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); @@ -347,7 +346,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { log.warn("{}: Exception while calling subscriber.onError. Last successful response:" + - " RequestId - {}, Timestamp - {}", shardId, innerThrowable, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + "Last successful response details: {}", shardId, innerThrowable, lastSuccessfulRequestDetails); } subscriber = null; flow = null; @@ -369,7 +368,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" - + "previous subscription - {}. Last successful response: RequestId - {}, Timestamp - {}", shardId, subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + + "previous subscription - {}. Last successful response details: {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails); recordsDeliveryQueue.clear(); } } @@ -480,7 +479,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." + - " Last successful response: RequestId - {}, Timestamp - {}", shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + " Last successful response details: {}", shardId, lastSuccessfulRequestDetails); errorOccurred(triggeringFlow, t); } } @@ -576,8 +575,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}", - shardId, n, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response details: {}", + shardId, n, lastSuccessfulRequestDetails); return; } if (flow == null) { @@ -603,14 +602,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}", - shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response details: {}", + shardId, lastSuccessfulRequestDetails); return; } if (!hasValidSubscriber()) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response: RequestId - {}, Timestamp - {}", - shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response details: {}", + shardId, lastSuccessfulRequestDetails); } subscriber = null; if (flow != null) { @@ -738,10 +737,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { - log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response: RequestId - {}, Timestamp - {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}", + log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response details: {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}", parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), connectionStartedAt); - lastSuccessfulResponseDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString()); + lastSuccessfulResponseDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString())); parent.setLastSuccessfulResponseDetails(lastSuccessfulResponseDetails); } @@ -804,9 +803,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); } catch (Exception e) { log.warn( - "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response: RequestId - {}, Timestamp - {}", + "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response details: {}", parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), - subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp()); + subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.lastSuccessfulRequestDetails); } } @@ -822,14 +821,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // the // subscription, which was cancelled for a reason (usually queue overflow). // - log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response: RequestId - {}, Timestamp - {}", - parent.shardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp()); + log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response details: {}", + parent.shardId, parent.lastSuccessfulRequestDetails); return; } if (this.isDisposed) { log.warn( - "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response: RequestId - {}, Timestamp - {}", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp()); + "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response details: {}", + parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); return; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index 484a8b2b..b4710e7b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -42,6 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private Subscriber subscriber; + private final RequestDetails lastSuccessfulResponseDetails = new RequestDetails(); public BlockingRecordsPublisher(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { @@ -73,8 +74,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher { } @Override - public Optional getLastSuccessfulResponseDetails() { - return Optional.empty(); + public RequestDetails getLastSuccessfulResponseDetails() { + return lastSuccessfulResponseDetails; } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 82fd0e7d..48ed7746 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -100,6 +100,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private boolean wasReset = false; private Instant lastEventDeliveryTime = Instant.EPOCH; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); @Data @Accessors(fluent = true) @@ -263,8 +264,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } @Override - public Optional getLastSuccessfulResponseDetails() { - return Optional.empty(); + public RequestDetails getLastSuccessfulResponseDetails() { + return lastSuccessfulRequestDetails; } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index eeb68c0a..04f8d276 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -79,6 +79,8 @@ public class ShardConsumerSubscriberTest { private static final String TERMINAL_MARKER = "Terminal"; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); + @Mock private ShardConsumer shardConsumer; @Mock @@ -559,8 +561,8 @@ public class ShardConsumerSubscriberTest { } @Override - public Optional getLastSuccessfulResponseDetails() { - return Optional.empty(); + public RequestDetails getLastSuccessfulResponseDetails() { + return lastSuccessfulRequestDetails; } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index a679a66d..dabf9f8f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -89,6 +89,7 @@ public class ShardConsumerTest { private final String shardId = "shardId-0-0"; private final String concurrencyToken = "TestToken"; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); private ShardInfo shardInfo; private TaskExecutionListenerInput initialTaskInput; private TaskExecutionListenerInput processTaskInput; @@ -211,8 +212,8 @@ public class ShardConsumerTest { } @Override - public Optional getLastSuccessfulResponseDetails() { - return Optional.empty(); + public RequestDetails getLastSuccessfulResponseDetails() { + return lastSuccessfulRequestDetails; } @Override