Addressing PR comments.

This commit is contained in:
jushkem 2020-01-23 13:40:14 -08:00
parent 7dc6f37d2d
commit a459ec6499
7 changed files with 42 additions and 34 deletions

View file

@ -130,7 +130,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error( log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.", "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse); shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastRequestId());
cancel(); cancel();
// Start the subscription again which will update the lastRequestTime as well. // Start the subscription again which will update the lastRequestTime as well.

View file

@ -15,6 +15,8 @@
package software.amazon.kinesis.retrieval; package software.amazon.kinesis.retrieval;
import lombok.Getter;
import lombok.Setter;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -23,7 +25,11 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** /**
* Provides a record publisher that will retrieve records from Kinesis for processing * Provides a record publisher that will retrieve records from Kinesis for processing
*/ */
public interface RecordsPublisher extends Publisher<RecordsRetrieved> { public abstract class RecordsPublisher implements Publisher<RecordsRetrieved> {
@Getter @Setter
private String lastRequestId;
/** /**
* Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will * Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will
* begin from that sequence number, otherwise it will use the initial position. * begin from that sequence number, otherwise it will use the initial position.
@ -33,25 +39,25 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
* @param initialPositionInStreamExtended * @param initialPositionInStreamExtended
* if there is no sequence number the initial position to use * if there is no sequence number the initial position to use
*/ */
void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended); public abstract void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended);
/** /**
* Restart from the last accepted and processed * Restart from the last accepted and processed
* @param recordsRetrieved the processRecordsInput to restart from * @param recordsRetrieved the processRecordsInput to restart from
*/ */
void restartFrom(RecordsRetrieved recordsRetrieved); public abstract void restartFrom(RecordsRetrieved recordsRetrieved);
/** /**
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. * Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
*/ */
void shutdown(); public abstract void shutdown();
/** /**
* Notify the publisher on receipt of a data event. * Notify the publisher on receipt of a data event.
* @param ack acknowledgement received from the subscriber. * @param ack acknowledgement received from the subscriber.
*/ */
default void notify(RecordsDeliveryAck ack) { public void notify(RecordsDeliveryAck ack) {
throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber");
} }
} }

View file

@ -61,7 +61,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@KinesisClientInternalApi @KinesisClientInternalApi
public class FanOutRecordsPublisher implements RecordsPublisher { public class FanOutRecordsPublisher extends RecordsPublisher {
private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory( private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(
ThrowableType.ACQUIRE_TIMEOUT); ThrowableType.ACQUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
@ -204,8 +204,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
subscriber.onNext(recordsRetrieved); subscriber.onNext(recordsRetrieved);
} }
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. rid {}",
shardId, recordsDeliveryQueue.remainingCapacity()); shardId, recordsDeliveryQueue.remainingCapacity(), getLastRequestId());
throw e; throw e;
} catch (Throwable t) { } catch (Throwable t) {
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
@ -288,12 +288,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (!hasValidSubscriber()) { if (!hasValidSubscriber()) {
if(hasValidFlow()) { if(hasValidFlow()) {
log.warn( log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null. rid {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId); shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastRequestId());
} else { } else {
log.warn( log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null", "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null. rid {}",
shardId); shardId, getLastRequestId());
} }
return; return;
} }
@ -304,8 +304,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (isActiveFlow(triggeringFlow)) { if (isActiveFlow(triggeringFlow)) {
if (flow != null) { if (flow != null) {
String logMessage = String.format( String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s", "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s. rid {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString); shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastRequestId());
switch (category.throwableType) { switch (category.throwableType) {
case READ_TIMEOUT: case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable); log.debug(logMessage, propagationThrowable);
@ -329,7 +329,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
try { try {
handleFlowError(propagationThrowable, triggeringFlow); handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) { } catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); log.warn("{}: Exception while calling subscriber.onError. rid {}", shardId, innerThrowable, getLastRequestId());
} }
subscriber = null; subscriber = null;
flow = null; flow = null;
@ -351,7 +351,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Clear any lingering records in the queue. // Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) { if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
+ "previous subscription - {} ", shardId, subscribeToShardId); + "previous subscription - {}. rid {}", shardId, subscribeToShardId, getLastRequestId());
recordsDeliveryQueue.clear(); recordsDeliveryQueue.clear();
} }
} }
@ -461,7 +461,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
try { try {
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
} catch (Throwable t) { } catch (Throwable t) {
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId); log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher. rid {}", shardId, getLastRequestId());
errorOccurred(triggeringFlow, t); errorOccurred(triggeringFlow, t);
} }
} }
@ -557,8 +557,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) { synchronized (lockObject) {
if (subscriber != s) { if (subscriber != s) {
log.warn( log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.", "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. rid {}",
shardId, n); shardId, n, getLastRequestId());
return; return;
} }
if (flow == null) { if (flow == null) {
@ -584,14 +584,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) { synchronized (lockObject) {
if (subscriber != s) { if (subscriber != s) {
log.warn( log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.", "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. rid {}",
shardId); shardId, getLastRequestId());
return; return;
} }
if (!hasValidSubscriber()) { if (!hasValidSubscriber()) {
log.warn( log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber", "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. rid {}",
shardId); shardId, getLastRequestId());
} }
subscriber = null; subscriber = null;
if (flow != null) { if (flow != null) {
@ -718,7 +718,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override @Override
public void responseReceived(SubscribeToShardResponse response) { public void responseReceived(SubscribeToShardResponse response) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. rid: {}, erid: {}, sdkfields: {}", parent.setLastRequestId(response.responseMetadata().requestId());
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. rid: {}, erid: {}, sdkfields: {}",
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), response.sdkFields()); parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), response.sdkFields());
} }
@ -781,10 +782,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) { } catch (Exception e) {
log.warn( log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ", "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. rid {}",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.shardId, subscriptionShutdownEvent.getEventIdentifier(),
parent.recordsDeliveryQueue.remainingCapacity(), parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional()); subscriptionShutdownEvent.getShutdownEventThrowableOptional(),
parent.getLastRequestId());
} }
} }
@ -800,13 +802,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// the // the
// subscription, which was cancelled for a reason (usually queue overflow). // subscription, which was cancelled for a reason (usually queue overflow).
// //
log.warn("{}: complete called on a cancelled subscription. Ignoring completion", parent.shardId); log.warn("{}: complete called on a cancelled subscription. Ignoring completion. rid {}", parent.shardId, parent.getLastRequestId());
return; return;
} }
if (this.isDisposed) { if (this.isDisposed) {
log.warn( log.warn(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion", "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. rid {}",
parent.shardId, connectionStartedAt, subscribeToShardId); parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastRequestId());
return; return;
} }

View file

@ -35,7 +35,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
* GetRecordsRetrievalStrategy class. * GetRecordsRetrievalStrategy class.
*/ */
@KinesisClientInternalApi @KinesisClientInternalApi
public class BlockingRecordsPublisher implements RecordsPublisher { public class BlockingRecordsPublisher extends RecordsPublisher {
private final int maxRecordsPerCall; private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;

View file

@ -76,7 +76,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
*/ */
@Slf4j @Slf4j
@KinesisClientInternalApi @KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher { public class PrefetchRecordsPublisher extends RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
private int maxPendingProcessRecordsInput; private int maxPendingProcessRecordsInput;
private int maxByteSize; private int maxByteSize;

View file

@ -490,7 +490,7 @@ public class ShardConsumerSubscriberTest {
} }
} }
private class TestPublisher implements RecordsPublisher { private class TestPublisher extends RecordsPublisher {
private final LinkedList<ResponseItem> responses = new LinkedList<>(); private final LinkedList<ResponseItem> responses = new LinkedList<>();
protected volatile long requested = 0; protected volatile long requested = 0;

View file

@ -168,7 +168,7 @@ public class ShardConsumerTest {
assertThat(remainder.isEmpty(), equalTo(true)); assertThat(remainder.isEmpty(), equalTo(true));
} }
private class TestPublisher implements RecordsPublisher { private class TestPublisher extends RecordsPublisher {
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
final CyclicBarrier requestBarrier = new CyclicBarrier(2); final CyclicBarrier requestBarrier = new CyclicBarrier(2);