Addressing PR comments.
This commit is contained in:
parent
2f6e5d32c1
commit
dd15ea2a57
8 changed files with 91 additions and 66 deletions
|
|
@ -2,12 +2,54 @@ package software.amazon.kinesis.common;
|
|||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
@Accessors(fluent = true)
|
||||
import java.util.Optional;
|
||||
|
||||
@Accessors(fluent=true)
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public class RequestDetails {
|
||||
private final String requestId;
|
||||
private final String timestamp;
|
||||
|
||||
/**
|
||||
* Placeholder for logging when no successful request has been made.
|
||||
*/
|
||||
private static final String NONE = "NONE";
|
||||
|
||||
private final Optional<String> requestId;
|
||||
private final Optional<String> timestamp;
|
||||
|
||||
public RequestDetails() {
|
||||
this(Optional.empty(), Optional.empty());
|
||||
}
|
||||
|
||||
public RequestDetails(Optional<String> requestId, Optional<String> timestamp) {
|
||||
this.requestId = requestId;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets last successful response's request id.
|
||||
*
|
||||
* @return requestId associated with last succesful response.
|
||||
*/
|
||||
public String getLastSuccessfulResponseRequestId() {
|
||||
return requestId.orElse(NONE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets last successful response's timestamp.
|
||||
*
|
||||
* @return timestamp associated with last successful response.
|
||||
*/
|
||||
public String getLastSuccessfulResponseTimestamp() {
|
||||
return timestamp.orElse(NONE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("request id - %s, timestamp - %s", getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
|
|||
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
|
||||
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
|
||||
log.error(
|
||||
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseRequestId(), recordsPublisher.getLastSuccessfulResponseTimestamp());
|
||||
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response details: {}",
|
||||
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseDetails());
|
||||
cancel();
|
||||
|
||||
// Start the subscription again which will update the lastRequestTime as well.
|
||||
|
|
|
|||
|
|
@ -28,10 +28,7 @@ import java.util.Optional;
|
|||
*/
|
||||
public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
|
||||
|
||||
/**
|
||||
* Placeholder for logging when no successful request has been made.
|
||||
*/
|
||||
String NONE = "NONE";
|
||||
|
||||
|
||||
/**
|
||||
* Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will
|
||||
|
|
@ -61,25 +58,7 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
|
|||
*
|
||||
* @return details associated with last successful response.
|
||||
*/
|
||||
Optional<RequestDetails> getLastSuccessfulResponseDetails();
|
||||
|
||||
/**
|
||||
* Gets last successful response's request id.
|
||||
*
|
||||
* @return requestId associated with last succesful response.
|
||||
*/
|
||||
default String getLastSuccessfulResponseRequestId() {
|
||||
return getLastSuccessfulResponseDetails().map(RequestDetails::requestId).orElse(NONE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets last successful response's timestamp.
|
||||
*
|
||||
* @return timestamp associated with last successful response.
|
||||
*/
|
||||
default String getLastSuccessfulResponseTimestamp() {
|
||||
return getLastSuccessfulResponseDetails().map(RequestDetails::timestamp).orElse(NONE);
|
||||
}
|
||||
RequestDetails getLastSuccessfulResponseDetails();
|
||||
|
||||
/**
|
||||
* Notify the publisher on receipt of a data event.
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
|
||||
MAX_EVENT_BURST_FROM_SERVICE);
|
||||
|
||||
private Optional<RequestDetails> lastSuccessfulRequestDetails = Optional.empty();
|
||||
private RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||
|
||||
@Override
|
||||
public void start(ExtendedSequenceNumber extendedSequenceNumber,
|
||||
|
|
@ -148,12 +148,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
|
||||
public RequestDetails getLastSuccessfulResponseDetails() {
|
||||
return lastSuccessfulRequestDetails;
|
||||
}
|
||||
|
||||
public void setLastSuccessfulResponseDetails(RequestDetails requestDetails) {
|
||||
lastSuccessfulRequestDetails = Optional.of(requestDetails);
|
||||
private void setLastSuccessfulResponseDetails(RequestDetails requestDetails) {
|
||||
lastSuccessfulRequestDetails = requestDetails;
|
||||
}
|
||||
|
||||
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
|
||||
|
|
@ -218,8 +218,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
}
|
||||
} catch (IllegalStateException e) {
|
||||
|
||||
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardId, recordsDeliveryQueue.remainingCapacity(), getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response details: {}",
|
||||
shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
|
||||
|
|
@ -303,13 +303,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
if(hasValidFlow()) {
|
||||
log.warn(
|
||||
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
|
||||
" Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
" Last successful response details: {}", shardId, flow.connectionStartedAt,
|
||||
flow.subscribeToShardId, lastSuccessfulRequestDetails);
|
||||
} else {
|
||||
log.warn(
|
||||
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
|
||||
" Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
" Last successful response details: {}", shardId, lastSuccessfulRequestDetails);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
@ -321,8 +320,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
if (flow != null) {
|
||||
String logMessage = String.format(
|
||||
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
|
||||
" Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
" Last successful response details: {}",
|
||||
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
|
||||
switch (category.throwableType) {
|
||||
case READ_TIMEOUT:
|
||||
log.debug(logMessage, propagationThrowable);
|
||||
|
|
@ -347,7 +346,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
handleFlowError(propagationThrowable, triggeringFlow);
|
||||
} catch (Throwable innerThrowable) {
|
||||
log.warn("{}: Exception while calling subscriber.onError. Last successful response:" +
|
||||
" RequestId - {}, Timestamp - {}", shardId, innerThrowable, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
"Last successful response details: {}", shardId, innerThrowable, lastSuccessfulRequestDetails);
|
||||
}
|
||||
subscriber = null;
|
||||
flow = null;
|
||||
|
|
@ -369,7 +368,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
// Clear any lingering records in the queue.
|
||||
if (!recordsDeliveryQueue.isEmpty()) {
|
||||
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
|
||||
+ "previous subscription - {}. Last successful response: RequestId - {}, Timestamp - {}", shardId, subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
+ "previous subscription - {}. Last successful response details: {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails);
|
||||
recordsDeliveryQueue.clear();
|
||||
}
|
||||
}
|
||||
|
|
@ -480,7 +479,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
|
||||
} catch (Throwable t) {
|
||||
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." +
|
||||
" Last successful response: RequestId - {}, Timestamp - {}", shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
" Last successful response details: {}", shardId, lastSuccessfulRequestDetails);
|
||||
errorOccurred(triggeringFlow, t);
|
||||
}
|
||||
}
|
||||
|
|
@ -576,8 +575,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
synchronized (lockObject) {
|
||||
if (subscriber != s) {
|
||||
log.warn(
|
||||
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardId, n, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response details: {}",
|
||||
shardId, n, lastSuccessfulRequestDetails);
|
||||
return;
|
||||
}
|
||||
if (flow == null) {
|
||||
|
|
@ -603,14 +602,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
synchronized (lockObject) {
|
||||
if (subscriber != s) {
|
||||
log.warn(
|
||||
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response details: {}",
|
||||
shardId, lastSuccessfulRequestDetails);
|
||||
return;
|
||||
}
|
||||
if (!hasValidSubscriber()) {
|
||||
log.warn(
|
||||
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
|
||||
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response details: {}",
|
||||
shardId, lastSuccessfulRequestDetails);
|
||||
}
|
||||
subscriber = null;
|
||||
if (flow != null) {
|
||||
|
|
@ -738,10 +737,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
|
||||
@Override
|
||||
public void responseReceived(SubscribeToShardResponse response) {
|
||||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response: RequestId - {}, Timestamp - {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}",
|
||||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response details: {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), connectionStartedAt);
|
||||
|
||||
lastSuccessfulResponseDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString());
|
||||
lastSuccessfulResponseDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString()));
|
||||
parent.setLastSuccessfulResponseDetails(lastSuccessfulResponseDetails);
|
||||
}
|
||||
|
||||
|
|
@ -804,9 +803,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
|
||||
} catch (Exception e) {
|
||||
log.warn(
|
||||
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response details: {}",
|
||||
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
|
||||
subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp());
|
||||
subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.lastSuccessfulRequestDetails);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -822,14 +821,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
// the
|
||||
// subscription, which was cancelled for a reason (usually queue overflow).
|
||||
//
|
||||
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
parent.shardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp());
|
||||
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response details: {}",
|
||||
parent.shardId, parent.lastSuccessfulRequestDetails);
|
||||
return;
|
||||
}
|
||||
if (this.isDisposed) {
|
||||
log.warn(
|
||||
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response: RequestId - {}, Timestamp - {}",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp());
|
||||
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response details: {}",
|
||||
parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
|
|||
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||
|
||||
private Subscriber<? super RecordsRetrieved> subscriber;
|
||||
private final RequestDetails lastSuccessfulResponseDetails = new RequestDetails();
|
||||
|
||||
public BlockingRecordsPublisher(final int maxRecordsPerCall,
|
||||
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||
|
|
@ -73,8 +74,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
|
||||
return Optional.empty();
|
||||
public RequestDetails getLastSuccessfulResponseDetails() {
|
||||
return lastSuccessfulResponseDetails;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -100,6 +100,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
private boolean wasReset = false;
|
||||
|
||||
private Instant lastEventDeliveryTime = Instant.EPOCH;
|
||||
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||
|
||||
@Data
|
||||
@Accessors(fluent = true)
|
||||
|
|
@ -263,8 +264,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
|
||||
return Optional.empty();
|
||||
public RequestDetails getLastSuccessfulResponseDetails() {
|
||||
return lastSuccessfulRequestDetails;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -79,6 +79,8 @@ public class ShardConsumerSubscriberTest {
|
|||
|
||||
private static final String TERMINAL_MARKER = "Terminal";
|
||||
|
||||
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||
|
||||
@Mock
|
||||
private ShardConsumer shardConsumer;
|
||||
@Mock
|
||||
|
|
@ -559,8 +561,8 @@ public class ShardConsumerSubscriberTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
|
||||
return Optional.empty();
|
||||
public RequestDetails getLastSuccessfulResponseDetails() {
|
||||
return lastSuccessfulRequestDetails;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -89,6 +89,7 @@ public class ShardConsumerTest {
|
|||
|
||||
private final String shardId = "shardId-0-0";
|
||||
private final String concurrencyToken = "TestToken";
|
||||
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||
private ShardInfo shardInfo;
|
||||
private TaskExecutionListenerInput initialTaskInput;
|
||||
private TaskExecutionListenerInput processTaskInput;
|
||||
|
|
@ -211,8 +212,8 @@ public class ShardConsumerTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
|
||||
return Optional.empty();
|
||||
public RequestDetails getLastSuccessfulResponseDetails() {
|
||||
return lastSuccessfulRequestDetails;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in a new issue