Fixing naming.

This commit is contained in:
jushkem 2020-02-14 12:19:57 -08:00
parent dd15ea2a57
commit 67bbdfdeec
8 changed files with 39 additions and 39 deletions

View file

@ -29,26 +29,26 @@ public class RequestDetails {
} }
/** /**
* Gets last successful response's request id. * Gets last successful request's request id.
* *
* @return requestId associated with last succesful response. * @return requestId associated with last successful request.
*/ */
public String getLastSuccessfulResponseRequestId() { public String getRequestId() {
return requestId.orElse(NONE); return requestId.orElse(NONE);
} }
/** /**
* Gets last successful response's timestamp. * Gets last successful request's timestamp.
* *
* @return timestamp associated with last successful response. * @return timestamp associated with last successful request.
*/ */
public String getLastSuccessfulResponseTimestamp() { public String getTimestamp() {
return timestamp.orElse(NONE); return timestamp.orElse(NONE);
} }
@Override @Override
public String toString() { public String toString() {
return String.format("request id - %s, timestamp - %s", getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); return String.format("request id - %s, timestamp - %s", getRequestId(), getTimestamp());
} }
} }

View file

@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now); Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error( log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response details: {}", "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseDetails()); shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
cancel(); cancel();
// Start the subscription again which will update the lastRequestTime as well. // Start the subscription again which will update the lastRequestTime as well.

View file

@ -54,11 +54,11 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
void shutdown(); void shutdown();
/** /**
* Gets last successful response details. * Gets last successful request details.
* *
* @return details associated with last successful response. * @return details associated with last successful request.
*/ */
RequestDetails getLastSuccessfulResponseDetails(); RequestDetails getLastSuccessfulRequestDetails();
/** /**
* Notify the publisher on receipt of a data event. * Notify the publisher on receipt of a data event.

View file

@ -148,11 +148,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} }
@Override @Override
public RequestDetails getLastSuccessfulResponseDetails() { public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails; return lastSuccessfulRequestDetails;
} }
private void setLastSuccessfulResponseDetails(RequestDetails requestDetails) { private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) {
lastSuccessfulRequestDetails = requestDetails; lastSuccessfulRequestDetails = requestDetails;
} }
@ -218,7 +218,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} }
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response details: {}", log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}",
shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
throw e; throw e;
} catch (Throwable t) { } catch (Throwable t) {
@ -303,12 +303,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if(hasValidFlow()) { if(hasValidFlow()) {
log.warn( log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful response details: {}", shardId, flow.connectionStartedAt, " Last successful request details -- {}", shardId, flow.connectionStartedAt,
flow.subscribeToShardId, lastSuccessfulRequestDetails); flow.subscribeToShardId, lastSuccessfulRequestDetails);
} else { } else {
log.warn( log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
" Last successful response details: {}", shardId, lastSuccessfulRequestDetails); " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
} }
return; return;
} }
@ -320,7 +320,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (flow != null) { if (flow != null) {
String logMessage = String.format( String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful response details: {}", " Last successful request details -- {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) { switch (category.throwableType) {
case READ_TIMEOUT: case READ_TIMEOUT:
@ -345,8 +345,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
try { try {
handleFlowError(propagationThrowable, triggeringFlow); handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) { } catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError. Last successful response:" + log.warn("{}: Exception while calling subscriber.onError. Last successful request:" +
"Last successful response details: {}", shardId, innerThrowable, lastSuccessfulRequestDetails); "Last successful request details -- {}", shardId, innerThrowable, lastSuccessfulRequestDetails);
} }
subscriber = null; subscriber = null;
flow = null; flow = null;
@ -368,7 +368,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Clear any lingering records in the queue. // Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) { if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
+ "previous subscription - {}. Last successful response details: {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails); + "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails);
recordsDeliveryQueue.clear(); recordsDeliveryQueue.clear();
} }
} }
@ -479,7 +479,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
} catch (Throwable t) { } catch (Throwable t) {
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." + log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." +
" Last successful response details: {}", shardId, lastSuccessfulRequestDetails); " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
errorOccurred(triggeringFlow, t); errorOccurred(triggeringFlow, t);
} }
} }
@ -575,7 +575,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) { synchronized (lockObject) {
if (subscriber != s) { if (subscriber != s) {
log.warn( log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response details: {}", "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
shardId, n, lastSuccessfulRequestDetails); shardId, n, lastSuccessfulRequestDetails);
return; return;
} }
@ -602,13 +602,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) { synchronized (lockObject) {
if (subscriber != s) { if (subscriber != s) {
log.warn( log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response details: {}", "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails); shardId, lastSuccessfulRequestDetails);
return; return;
} }
if (!hasValidSubscriber()) { if (!hasValidSubscriber()) {
log.warn( log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response details: {}", "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails); shardId, lastSuccessfulRequestDetails);
} }
subscriber = null; subscriber = null;
@ -699,7 +699,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private boolean isDisposed = false; private boolean isDisposed = false;
private boolean isErrorDispatched = false; private boolean isErrorDispatched = false;
private boolean isCancelled = false; private boolean isCancelled = false;
private RequestDetails lastSuccessfulResponseDetails; private RequestDetails recordFlowLastSuccessfuRequestDetails;
@Override @Override
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
@ -737,11 +737,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override @Override
public void responseReceived(SubscribeToShardResponse response) { public void responseReceived(SubscribeToShardResponse response) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response details: {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}", log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {} -- Last successful request details -- {} ",
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), connectionStartedAt); parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), parent.lastSuccessfulRequestDetails);
lastSuccessfulResponseDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString())); recordFlowLastSuccessfuRequestDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString()));
parent.setLastSuccessfulResponseDetails(lastSuccessfulResponseDetails); parent.setLastSuccessfulRequestDetails(this.recordFlowLastSuccessfuRequestDetails);
} }
@Override @Override
@ -803,7 +803,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) { } catch (Exception e) {
log.warn( log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response details: {}", "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.lastSuccessfulRequestDetails); subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.lastSuccessfulRequestDetails);
} }
@ -821,13 +821,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// the // the
// subscription, which was cancelled for a reason (usually queue overflow). // subscription, which was cancelled for a reason (usually queue overflow).
// //
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response details: {}", log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}",
parent.shardId, parent.lastSuccessfulRequestDetails); parent.shardId, parent.lastSuccessfulRequestDetails);
return; return;
} }
if (this.isDisposed) { if (this.isDisposed) {
log.warn( log.warn(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response details: {}", "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
return; return;
} }

View file

@ -42,7 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private Subscriber<? super RecordsRetrieved> subscriber; private Subscriber<? super RecordsRetrieved> subscriber;
private final RequestDetails lastSuccessfulResponseDetails = new RequestDetails(); private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
public BlockingRecordsPublisher(final int maxRecordsPerCall, public BlockingRecordsPublisher(final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
@ -74,8 +74,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
} }
@Override @Override
public RequestDetails getLastSuccessfulResponseDetails() { public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulResponseDetails; return lastSuccessfulRequestDetails;
} }
@Override @Override

View file

@ -264,7 +264,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
} }
@Override @Override
public RequestDetails getLastSuccessfulResponseDetails() { public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails; return lastSuccessfulRequestDetails;
} }

View file

@ -561,7 +561,7 @@ public class ShardConsumerSubscriberTest {
} }
@Override @Override
public RequestDetails getLastSuccessfulResponseDetails() { public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails; return lastSuccessfulRequestDetails;
} }

View file

@ -212,7 +212,7 @@ public class ShardConsumerTest {
} }
@Override @Override
public RequestDetails getLastSuccessfulResponseDetails() { public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails; return lastSuccessfulRequestDetails;
} }