From 67bbdfdeec171417d2b63a75b60d94a1100c7418 Mon Sep 17 00:00:00 2001 From: jushkem <20001595+jushkem@users.noreply.github.com> Date: Fri, 14 Feb 2020 12:19:57 -0800 Subject: [PATCH] Fixing naming. --- .../amazon/kinesis/common/RequestDetails.java | 14 +++---- .../lifecycle/ShardConsumerSubscriber.java | 4 +- .../kinesis/retrieval/RecordsPublisher.java | 6 +-- .../fanout/FanOutRecordsPublisher.java | 42 +++++++++---------- .../polling/BlockingRecordsPublisher.java | 6 +-- .../polling/PrefetchRecordsPublisher.java | 2 +- .../ShardConsumerSubscriberTest.java | 2 +- .../kinesis/lifecycle/ShardConsumerTest.java | 2 +- 8 files changed, 39 insertions(+), 39 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java index c32bf0e6..bc170920 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java @@ -29,26 +29,26 @@ public class RequestDetails { } /** - * Gets last successful response's request id. + * Gets last successful request's request id. * - * @return requestId associated with last succesful response. + * @return requestId associated with last successful request. */ - public String getLastSuccessfulResponseRequestId() { + public String getRequestId() { return requestId.orElse(NONE); } /** - * Gets last successful response's timestamp. + * Gets last successful request's timestamp. * - * @return timestamp associated with last successful response. + * @return timestamp associated with last successful request. */ - public String getLastSuccessfulResponseTimestamp() { + public String getTimestamp() { return timestamp.orElse(NONE); } @Override public String toString() { - return String.format("request id - %s, timestamp - %s", getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); + return String.format("request id - %s, timestamp - %s", getRequestId(), getTimestamp()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 04aa12d3..4c05ac94 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber { Duration timeSinceLastResponse = Duration.between(lastRequestTime, now); if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { log.error( - "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response details: {}", - shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseDetails()); + "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}", + shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails()); cancel(); // Start the subscription again which will update the lastRequestTime as well. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index b53b8c88..5fc029b4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -54,11 +54,11 @@ public interface RecordsPublisher extends Publisher { void shutdown(); /** - * Gets last successful response details. + * Gets last successful request details. * - * @return details associated with last successful response. + * @return details associated with last successful request. */ - RequestDetails getLastSuccessfulResponseDetails(); + RequestDetails getLastSuccessfulRequestDetails(); /** * Notify the publisher on receipt of a data event. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 7b852b03..d4c8f485 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -148,11 +148,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } @Override - public RequestDetails getLastSuccessfulResponseDetails() { + public RequestDetails getLastSuccessfulRequestDetails() { return lastSuccessfulRequestDetails; } - private void setLastSuccessfulResponseDetails(RequestDetails requestDetails) { + private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) { lastSuccessfulRequestDetails = requestDetails; } @@ -218,7 +218,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } catch (IllegalStateException e) { - log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response details: {}", + log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}", shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); throw e; } catch (Throwable t) { @@ -303,12 +303,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if(hasValidFlow()) { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + - " Last successful response details: {}", shardId, flow.connectionStartedAt, + " Last successful request details -- {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId, lastSuccessfulRequestDetails); } else { log.warn( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." + - " Last successful response details: {}", shardId, lastSuccessfulRequestDetails); + " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); } return; } @@ -320,7 +320,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (flow != null) { String logMessage = String.format( "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + - " Last successful response details: {}", + " Last successful request details -- {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); switch (category.throwableType) { case READ_TIMEOUT: @@ -345,8 +345,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { try { handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError. Last successful response:" + - "Last successful response details: {}", shardId, innerThrowable, lastSuccessfulRequestDetails); + log.warn("{}: Exception while calling subscriber.onError. Last successful request:" + + "Last successful request details -- {}", shardId, innerThrowable, lastSuccessfulRequestDetails); } subscriber = null; flow = null; @@ -368,7 +368,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" - + "previous subscription - {}. Last successful response details: {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails); + + "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails); recordsDeliveryQueue.clear(); } } @@ -479,7 +479,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." + - " Last successful response details: {}", shardId, lastSuccessfulRequestDetails); + " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); errorOccurred(triggeringFlow, t); } } @@ -575,7 +575,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response details: {}", + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}", shardId, n, lastSuccessfulRequestDetails); return; } @@ -602,13 +602,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response details: {}", + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); return; } if (!hasValidSubscriber()) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response details: {}", + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); } subscriber = null; @@ -699,7 +699,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private boolean isDisposed = false; private boolean isErrorDispatched = false; private boolean isCancelled = false; - private RequestDetails lastSuccessfulResponseDetails; + private RequestDetails recordFlowLastSuccessfuRequestDetails; @Override public void onEventStream(SdkPublisher publisher) { @@ -737,11 +737,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { - log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response details: {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}", - parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), connectionStartedAt); + log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {} -- Last successful request details -- {} ", + parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), parent.lastSuccessfulRequestDetails); - lastSuccessfulResponseDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString())); - parent.setLastSuccessfulResponseDetails(lastSuccessfulResponseDetails); + recordFlowLastSuccessfuRequestDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString())); + parent.setLastSuccessfulRequestDetails(this.recordFlowLastSuccessfuRequestDetails); } @Override @@ -803,7 +803,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); } catch (Exception e) { log.warn( - "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response details: {}", + "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}", parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.lastSuccessfulRequestDetails); } @@ -821,13 +821,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // the // subscription, which was cancelled for a reason (usually queue overflow). // - log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response details: {}", + log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}", parent.shardId, parent.lastSuccessfulRequestDetails); return; } if (this.isDisposed) { log.warn( - "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response details: {}", + "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}", parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); return; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index b4710e7b..23004b51 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -42,7 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private Subscriber subscriber; - private final RequestDetails lastSuccessfulResponseDetails = new RequestDetails(); + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); public BlockingRecordsPublisher(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { @@ -74,8 +74,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher { } @Override - public RequestDetails getLastSuccessfulResponseDetails() { - return lastSuccessfulResponseDetails; + public RequestDetails getLastSuccessfulRequestDetails() { + return lastSuccessfulRequestDetails; } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 48ed7746..a0773bf8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -264,7 +264,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } @Override - public RequestDetails getLastSuccessfulResponseDetails() { + public RequestDetails getLastSuccessfulRequestDetails() { return lastSuccessfulRequestDetails; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 04f8d276..78e09fa1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -561,7 +561,7 @@ public class ShardConsumerSubscriberTest { } @Override - public RequestDetails getLastSuccessfulResponseDetails() { + public RequestDetails getLastSuccessfulRequestDetails() { return lastSuccessfulRequestDetails; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index dabf9f8f..46677fb9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -212,7 +212,7 @@ public class ShardConsumerTest { } @Override - public RequestDetails getLastSuccessfulResponseDetails() { + public RequestDetails getLastSuccessfulRequestDetails() { return lastSuccessfulRequestDetails; }