From a459ec6499bd02e2dda3906f3c36d1f3ec24a186 Mon Sep 17 00:00:00 2001 From: jushkem <20001595+jushkem@users.noreply.github.com> Date: Thu, 23 Jan 2020 13:40:14 -0800 Subject: [PATCH] Addressing PR comments. --- .../lifecycle/ShardConsumerSubscriber.java | 2 +- .../kinesis/retrieval/RecordsPublisher.java | 16 ++++-- .../fanout/FanOutRecordsPublisher.java | 50 ++++++++++--------- .../polling/BlockingRecordsPublisher.java | 2 +- .../polling/PrefetchRecordsPublisher.java | 2 +- .../ShardConsumerSubscriberTest.java | 2 +- .../kinesis/lifecycle/ShardConsumerTest.java | 2 +- 7 files changed, 42 insertions(+), 34 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 14e347a2..ef696e18 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -130,7 +130,7 @@ class ShardConsumerSubscriber implements Subscriber { if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { log.error( "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.", - shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse); + shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastRequestId()); cancel(); // Start the subscription again which will update the lastRequestTime as well. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index cd1e04f1..891ff6e5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -15,6 +15,8 @@ package software.amazon.kinesis.retrieval; +import lombok.Getter; +import lombok.Setter; import org.reactivestreams.Publisher; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -23,7 +25,11 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * Provides a record publisher that will retrieve records from Kinesis for processing */ -public interface RecordsPublisher extends Publisher { +public abstract class RecordsPublisher implements Publisher { + + @Getter @Setter + private String lastRequestId; + /** * Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will * begin from that sequence number, otherwise it will use the initial position. @@ -33,25 +39,25 @@ public interface RecordsPublisher extends Publisher { * @param initialPositionInStreamExtended * if there is no sequence number the initial position to use */ - void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended); + public abstract void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended); /** * Restart from the last accepted and processed * @param recordsRetrieved the processRecordsInput to restart from */ - void restartFrom(RecordsRetrieved recordsRetrieved); + public abstract void restartFrom(RecordsRetrieved recordsRetrieved); /** * Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. */ - void shutdown(); + public abstract void shutdown(); /** * Notify the publisher on receipt of a data event. * @param ack acknowledgement received from the subscriber. */ - default void notify(RecordsDeliveryAck ack) { + public void notify(RecordsDeliveryAck ack) { throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index dbf495a0..9453d3dc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -61,7 +61,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery @RequiredArgsConstructor @Slf4j @KinesisClientInternalApi -public class FanOutRecordsPublisher implements RecordsPublisher { +public class FanOutRecordsPublisher extends RecordsPublisher { private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory( ThrowableType.ACQUIRE_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); @@ -204,8 +204,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { subscriber.onNext(recordsRetrieved); } } catch (IllegalStateException e) { - log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", - shardId, recordsDeliveryQueue.remainingCapacity()); + log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. rid {}", + shardId, recordsDeliveryQueue.remainingCapacity(), getLastRequestId()); throw e; } catch (Throwable t) { log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); @@ -288,12 +288,12 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!hasValidSubscriber()) { if(hasValidFlow()) { log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", - shardId, flow.connectionStartedAt, flow.subscribeToShardId); + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null. rid {}", + shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastRequestId()); } else { log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null", - shardId); + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null. rid {}", + shardId, getLastRequestId()); } return; } @@ -304,8 +304,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (isActiveFlow(triggeringFlow)) { if (flow != null) { String logMessage = String.format( - "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString); + "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s. rid {}", + shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastRequestId()); switch (category.throwableType) { case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); @@ -329,7 +329,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { try { handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); + log.warn("{}: Exception while calling subscriber.onError. rid {}", shardId, innerThrowable, getLastRequestId()); } subscriber = null; flow = null; @@ -351,7 +351,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" - + "previous subscription - {} ", shardId, subscribeToShardId); + + "previous subscription - {}. rid {}", shardId, subscribeToShardId, getLastRequestId()); recordsDeliveryQueue.clear(); } } @@ -461,7 +461,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { try { bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { - log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId); + log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher. rid {}", shardId, getLastRequestId()); errorOccurred(triggeringFlow, t); } } @@ -557,8 +557,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.", - shardId, n); + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. rid {}", + shardId, n, getLastRequestId()); return; } if (flow == null) { @@ -584,14 +584,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.", - shardId); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. rid {}", + shardId, getLastRequestId()); return; } if (!hasValidSubscriber()) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber", - shardId); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. rid {}", + shardId, getLastRequestId()); } subscriber = null; if (flow != null) { @@ -718,7 +718,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { - log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. rid: {}, erid: {}, sdkfields: {}", + parent.setLastRequestId(response.responseMetadata().requestId()); + log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. rid: {}, erid: {}, sdkfields: {}", parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), response.sdkFields()); } @@ -781,10 +782,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher { .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); } catch (Exception e) { log.warn( - "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ", + "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. rid {}", parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), - subscriptionShutdownEvent.getShutdownEventThrowableOptional()); + subscriptionShutdownEvent.getShutdownEventThrowableOptional(), + parent.getLastRequestId()); } } @@ -800,13 +802,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // the // subscription, which was cancelled for a reason (usually queue overflow). // - log.warn("{}: complete called on a cancelled subscription. Ignoring completion", parent.shardId); + log.warn("{}: complete called on a cancelled subscription. Ignoring completion. rid {}", parent.shardId, parent.getLastRequestId()); return; } if (this.isDisposed) { log.warn( - "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion", - parent.shardId, connectionStartedAt, subscribeToShardId); + "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. rid {}", + parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastRequestId()); return; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index d6df725a..f45be39e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -35,7 +35,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; * GetRecordsRetrievalStrategy class. */ @KinesisClientInternalApi -public class BlockingRecordsPublisher implements RecordsPublisher { +public class BlockingRecordsPublisher extends RecordsPublisher { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 3f28ddf4..6538bd26 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -76,7 +76,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery */ @Slf4j @KinesisClientInternalApi -public class PrefetchRecordsPublisher implements RecordsPublisher { +public class PrefetchRecordsPublisher extends RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private int maxPendingProcessRecordsInput; private int maxByteSize; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 4d0f01ee..149131d5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -490,7 +490,7 @@ public class ShardConsumerSubscriberTest { } } - private class TestPublisher implements RecordsPublisher { + private class TestPublisher extends RecordsPublisher { private final LinkedList responses = new LinkedList<>(); protected volatile long requested = 0; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index e5db170b..4889191a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -168,7 +168,7 @@ public class ShardConsumerTest { assertThat(remainder.isEmpty(), equalTo(true)); } - private class TestPublisher implements RecordsPublisher { + private class TestPublisher extends RecordsPublisher { final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier requestBarrier = new CyclicBarrier(2);