Addressing PR comments.

This commit is contained in:
jushkem 2020-02-04 12:09:54 -08:00
parent a459ec6499
commit 0d45170734
8 changed files with 186 additions and 47 deletions

View file

@ -0,0 +1,17 @@
package software.amazon.kinesis.common;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.model.KinesisResponseMetadata;
import java.time.Instant;
import java.util.Optional;
@Accessors(fluent = true)
@Getter
@AllArgsConstructor
public class RequestDetails {
private final String requestId;
private final String timestamp;
}

View file

@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastRequestId());
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response: RequestId - {}, Timestamp - {}",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseRequestId(), recordsPublisher.getLastSuccessfulResponseTimestamp());
cancel();
// Start the subscription again which will update the lastRequestTime as well.

View file

@ -15,20 +15,23 @@
package software.amazon.kinesis.retrieval;
import lombok.Getter;
import lombok.Setter;
import org.reactivestreams.Publisher;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Optional;
/**
* Provides a record publisher that will retrieve records from Kinesis for processing
*/
public abstract class RecordsPublisher implements Publisher<RecordsRetrieved> {
public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
@Getter @Setter
private String lastRequestId;
/**
* Placeholder for logging when no successful request has been made.
*/
String NONE = "NONE";
/**
* Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will
@ -39,25 +42,47 @@ public abstract class RecordsPublisher implements Publisher<RecordsRetrieved> {
* @param initialPositionInStreamExtended
* if there is no sequence number the initial position to use
*/
public abstract void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended);
void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended);
/**
* Restart from the last accepted and processed
* @param recordsRetrieved the processRecordsInput to restart from
*/
public abstract void restartFrom(RecordsRetrieved recordsRetrieved);
void restartFrom(RecordsRetrieved recordsRetrieved);
/**
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
*/
public abstract void shutdown();
void shutdown();
/**
* Gets last successful response details.
*
* @return details associated with last successful response.
*/
Optional<RequestDetails> getLastSuccessfulResponseDetails();
/**
* Gets last successful response's request id.
*
* @return requestId associated with last succesful response.
*/
String getLastSuccessfulResponseRequestId();
/**
* Gets last successful response's timestamp.
*
* @return timestamp associated with last successful response.
*/
String getLastSuccessfulResponseTimestamp();
/**
* Notify the publisher on receipt of a data event.
*
* @param ack acknowledgement received from the subscriber.
*/
public void notify(RecordsDeliveryAck ack) {
default void notify(RecordsDeliveryAck ack) {
throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber");
}
}

View file

@ -37,6 +37,7 @@ import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.IteratorBuilder;
@ -50,6 +51,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -61,7 +63,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
@RequiredArgsConstructor
@Slf4j
@KinesisClientInternalApi
public class FanOutRecordsPublisher extends RecordsPublisher {
public class FanOutRecordsPublisher implements RecordsPublisher {
private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(
ThrowableType.ACQUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
@ -87,6 +89,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
MAX_EVENT_BURST_FROM_SERVICE);
private Optional<RequestDetails> lastSuccessfulRequestDetails = Optional.empty();
@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
@ -143,6 +147,25 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
}
}
@Override
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
return lastSuccessfulRequestDetails;
}
public void setLastSuccessfulResponseDetails(RequestDetails requestDetails) {
lastSuccessfulRequestDetails = Optional.of(requestDetails);
}
@Override
public String getLastSuccessfulResponseRequestId() {
return getLastSuccessfulResponseDetails().map(RequestDetails::requestId).orElse(NONE);
}
@Override
public String getLastSuccessfulResponseTimestamp() {
return getLastSuccessfulResponseDetails().map(RequestDetails::timestamp).orElse(NONE);
}
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
@VisibleForTesting
RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) {
@ -204,8 +227,9 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
subscriber.onNext(recordsRetrieved);
}
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. rid {}",
shardId, recordsDeliveryQueue.remainingCapacity(), getLastRequestId());
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response: RequestId - {}, Timestamp - {}",
shardId, recordsDeliveryQueue.remainingCapacity(), getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
throw e;
} catch (Throwable t) {
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
@ -288,12 +312,14 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
if (!hasValidSubscriber()) {
if(hasValidFlow()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null. rid {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastRequestId());
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful response: RequestId - {}, Timestamp - {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
} else {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null. rid {}",
shardId, getLastRequestId());
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
" Last successful response: RequestId - {}, Timestamp - {}",
shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
}
return;
}
@ -304,8 +330,9 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
if (isActiveFlow(triggeringFlow)) {
if (flow != null) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s. rid {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastRequestId());
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful response: RequestId - {}, Timestamp - {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
@ -329,7 +356,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
try {
handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError. rid {}", shardId, innerThrowable, getLastRequestId());
log.warn("{}: Exception while calling subscriber.onError. Last successful response:" +
" RequestId - {}, Timestamp - {}", shardId, innerThrowable, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
}
subscriber = null;
flow = null;
@ -351,7 +379,7 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
// Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
+ "previous subscription - {}. rid {}", shardId, subscribeToShardId, getLastRequestId());
+ "previous subscription - {}. Last successful response: RequestId - {}, Timestamp - {}", shardId, subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
recordsDeliveryQueue.clear();
}
}
@ -461,7 +489,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
try {
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
} catch (Throwable t) {
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher. rid {}", shardId, getLastRequestId());
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." +
" Last successful response: RequestId - {}, Timestamp - {}", shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
errorOccurred(triggeringFlow, t);
}
}
@ -557,8 +586,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. rid {}",
shardId, n, getLastRequestId());
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}",
shardId, n, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
return;
}
if (flow == null) {
@ -584,14 +613,14 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. rid {}",
shardId, getLastRequestId());
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}",
shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
return;
}
if (!hasValidSubscriber()) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. rid {}",
shardId, getLastRequestId());
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response: RequestId - {}, Timestamp - {}",
shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp());
}
subscriber = null;
if (flow != null) {
@ -681,6 +710,7 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
private boolean isDisposed = false;
private boolean isErrorDispatched = false;
private boolean isCancelled = false;
private RequestDetails lastSuccessfulResponseDetails;
@Override
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
@ -718,9 +748,11 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
@Override
public void responseReceived(SubscribeToShardResponse response) {
parent.setLastRequestId(response.responseMetadata().requestId());
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. rid: {}, erid: {}, sdkfields: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), response.sdkFields());
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response: RequestId - {}, Timestamp - {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}",
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), connectionStartedAt);
lastSuccessfulResponseDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString());
parent.setLastSuccessfulResponseDetails(lastSuccessfulResponseDetails);
}
@Override
@ -782,11 +814,9 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) {
log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. rid {}",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(),
parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional(),
parent.getLastRequestId());
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response: RequestId - {}, Timestamp - {}",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp());
}
}
@ -802,13 +832,14 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
// the
// subscription, which was cancelled for a reason (usually queue overflow).
//
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. rid {}", parent.shardId, parent.getLastRequestId());
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response: RequestId - {}, Timestamp - {}",
parent.shardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp());
return;
}
if (this.isDisposed) {
log.warn(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. rid {}",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastRequestId());
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response: RequestId - {}, Timestamp - {}",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp());
return;
}
@ -944,7 +975,6 @@ public class FanOutRecordsPublisher extends RecordsPublisher {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete",
parent.shardId, connectionStartedAt, subscribeToShardId);
}
}
}

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.retrieval.polling;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.reactivestreams.Subscriber;
@ -23,6 +24,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
@ -35,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
* GetRecordsRetrievalStrategy class.
*/
@KinesisClientInternalApi
public class BlockingRecordsPublisher extends RecordsPublisher {
public class BlockingRecordsPublisher implements RecordsPublisher {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@ -70,6 +72,21 @@ public class BlockingRecordsPublisher extends RecordsPublisher {
getRecordsRetrievalStrategy.shutdown();
}
@Override
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
return Optional.empty();
}
@Override
public String getLastSuccessfulResponseRequestId() {
return NONE;
}
@Override
public String getLastSuccessfulResponseTimestamp() {
return NONE;
}
@Override
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
subscriber = s;

View file

@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@ -46,6 +47,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
@ -76,7 +78,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
*/
@Slf4j
@KinesisClientInternalApi
public class PrefetchRecordsPublisher extends RecordsPublisher {
public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
private int maxPendingProcessRecordsInput;
private int maxByteSize;
@ -260,6 +262,21 @@ public class PrefetchRecordsPublisher extends RecordsPublisher {
started = false;
}
@Override
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
return Optional.empty();
}
@Override
public String getLastSuccessfulResponseRequestId() {
return NONE;
}
@Override
public String getLastSuccessfulResponseTimestamp() {
return NONE;
}
@Override
public void restartFrom(RecordsRetrieved recordsRetrieved) {
if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) {

View file

@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -61,6 +62,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
@ -490,7 +492,7 @@ public class ShardConsumerSubscriberTest {
}
}
private class TestPublisher extends RecordsPublisher {
private class TestPublisher implements RecordsPublisher {
private final LinkedList<ResponseItem> responses = new LinkedList<>();
protected volatile long requested = 0;
@ -556,6 +558,21 @@ public class ShardConsumerSubscriberTest {
}
@Override
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
return Optional.empty();
}
@Override
public String getLastSuccessfulResponseRequestId() {
return NONE;
}
@Override
public String getLastSuccessfulResponseTimestamp() {
return NONE;
}
@Override
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
subscriber = s;

View file

@ -70,6 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
@ -168,7 +169,7 @@ public class ShardConsumerTest {
assertThat(remainder.isEmpty(), equalTo(true));
}
private class TestPublisher extends RecordsPublisher {
private class TestPublisher implements RecordsPublisher {
final CyclicBarrier barrier = new CyclicBarrier(2);
final CyclicBarrier requestBarrier = new CyclicBarrier(2);
@ -209,6 +210,21 @@ public class ShardConsumerTest {
}
@Override
public Optional<RequestDetails> getLastSuccessfulResponseDetails() {
return Optional.empty();
}
@Override
public String getLastSuccessfulResponseRequestId() {
return NONE;
}
@Override
public String getLastSuccessfulResponseTimestamp() {
return NONE;
}
@Override
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
subscriber = s;