From 0d45170734429e7414f4a89d28bfd35e40a47cb7 Mon Sep 17 00:00:00 2001 From: jushkem <20001595+jushkem@users.noreply.github.com> Date: Tue, 4 Feb 2020 12:09:54 -0800 Subject: [PATCH] Addressing PR comments. --- .../amazon/kinesis/common/RequestDetails.java | 17 ++++ .../lifecycle/ShardConsumerSubscriber.java | 4 +- .../kinesis/retrieval/RecordsPublisher.java | 47 +++++++--- .../fanout/FanOutRecordsPublisher.java | 90 ++++++++++++------- .../polling/BlockingRecordsPublisher.java | 19 +++- .../polling/PrefetchRecordsPublisher.java | 19 +++- .../ShardConsumerSubscriberTest.java | 19 +++- .../kinesis/lifecycle/ShardConsumerTest.java | 18 +++- 8 files changed, 186 insertions(+), 47 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java new file mode 100644 index 00000000..75b38032 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java @@ -0,0 +1,17 @@ +package software.amazon.kinesis.common; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.experimental.Accessors; +import software.amazon.awssdk.services.kinesis.model.KinesisResponseMetadata; + +import java.time.Instant; +import java.util.Optional; + +@Accessors(fluent = true) +@Getter +@AllArgsConstructor +public class RequestDetails { + private final String requestId; + private final String timestamp; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index ef696e18..3783d726 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber { Duration timeSinceLastResponse = Duration.between(lastRequestTime, now); if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { log.error( - "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.", - shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastRequestId()); + "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response: RequestId - {}, Timestamp - {}", + shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseRequestId(), recordsPublisher.getLastSuccessfulResponseTimestamp()); cancel(); // Start the subscription again which will update the lastRequestTime as well. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index 891ff6e5..63cd0588 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -15,49 +15,74 @@ package software.amazon.kinesis.retrieval; -import lombok.Getter; -import lombok.Setter; import org.reactivestreams.Publisher; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.Optional; + /** * Provides a record publisher that will retrieve records from Kinesis for processing */ -public abstract class RecordsPublisher implements Publisher { +public interface RecordsPublisher extends Publisher { - @Getter @Setter - private String lastRequestId; + /** + * Placeholder for logging when no successful request has been made. + */ + String NONE = "NONE"; /** * Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will * begin from that sequence number, otherwise it will use the initial position. - * + * * @param extendedSequenceNumber * the sequence number to start processing from * @param initialPositionInStreamExtended * if there is no sequence number the initial position to use */ - public abstract void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended); + void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended); /** * Restart from the last accepted and processed * @param recordsRetrieved the processRecordsInput to restart from */ - public abstract void restartFrom(RecordsRetrieved recordsRetrieved); - + void restartFrom(RecordsRetrieved recordsRetrieved); + /** * Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. */ - public abstract void shutdown(); + void shutdown(); + + /** + * Gets last successful response details. + * + * @return details associated with last successful response. + */ + Optional getLastSuccessfulResponseDetails(); + + /** + * Gets last successful response's request id. + * + * @return requestId associated with last succesful response. + */ + String getLastSuccessfulResponseRequestId(); + + /** + * Gets last successful response's timestamp. + * + * @return timestamp associated with last successful response. + */ + String getLastSuccessfulResponseTimestamp(); /** * Notify the publisher on receipt of a data event. + * * @param ack acknowledgement received from the subscriber. */ - public void notify(RecordsDeliveryAck ack) { + default void notify(RecordsDeliveryAck ack) { throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 9453d3dc..4e8e2205 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -37,6 +37,7 @@ import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; import software.amazon.kinesis.retrieval.IteratorBuilder; @@ -50,6 +51,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -61,7 +63,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery @RequiredArgsConstructor @Slf4j @KinesisClientInternalApi -public class FanOutRecordsPublisher extends RecordsPublisher { +public class FanOutRecordsPublisher implements RecordsPublisher { private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory( ThrowableType.ACQUIRE_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); @@ -87,6 +89,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher { private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>( MAX_EVENT_BURST_FROM_SERVICE); + private Optional lastSuccessfulRequestDetails = Optional.empty(); + @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { @@ -143,6 +147,25 @@ public class FanOutRecordsPublisher extends RecordsPublisher { } } + @Override + public Optional getLastSuccessfulResponseDetails() { + return lastSuccessfulRequestDetails; + } + + public void setLastSuccessfulResponseDetails(RequestDetails requestDetails) { + lastSuccessfulRequestDetails = Optional.of(requestDetails); + } + + @Override + public String getLastSuccessfulResponseRequestId() { + return getLastSuccessfulResponseDetails().map(RequestDetails::requestId).orElse(NONE); + } + + @Override + public String getLastSuccessfulResponseTimestamp() { + return getLastSuccessfulResponseDetails().map(RequestDetails::timestamp).orElse(NONE); + } + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. @VisibleForTesting RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) { @@ -204,8 +227,9 @@ public class FanOutRecordsPublisher extends RecordsPublisher { subscriber.onNext(recordsRetrieved); } } catch (IllegalStateException e) { - log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. rid {}", - shardId, recordsDeliveryQueue.remainingCapacity(), getLastRequestId()); + + log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response: RequestId - {}, Timestamp - {}", + shardId, recordsDeliveryQueue.remainingCapacity(), getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); throw e; } catch (Throwable t) { log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); @@ -288,12 +312,14 @@ public class FanOutRecordsPublisher extends RecordsPublisher { if (!hasValidSubscriber()) { if(hasValidFlow()) { log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null. rid {}", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastRequestId()); + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + + " Last successful response: RequestId - {}, Timestamp - {}", + shardId, flow.connectionStartedAt, flow.subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); } else { log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null. rid {}", - shardId, getLastRequestId()); + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." + + " Last successful response: RequestId - {}, Timestamp - {}", + shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); } return; } @@ -304,8 +330,9 @@ public class FanOutRecordsPublisher extends RecordsPublisher { if (isActiveFlow(triggeringFlow)) { if (flow != null) { String logMessage = String.format( - "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s. rid {}", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastRequestId()); + "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + + " Last successful response: RequestId - {}, Timestamp - {}", + shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); switch (category.throwableType) { case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); @@ -329,7 +356,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher { try { handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError. rid {}", shardId, innerThrowable, getLastRequestId()); + log.warn("{}: Exception while calling subscriber.onError. Last successful response:" + + " RequestId - {}, Timestamp - {}", shardId, innerThrowable, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); } subscriber = null; flow = null; @@ -351,7 +379,7 @@ public class FanOutRecordsPublisher extends RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" - + "previous subscription - {}. rid {}", shardId, subscribeToShardId, getLastRequestId()); + + "previous subscription - {}. Last successful response: RequestId - {}, Timestamp - {}", shardId, subscribeToShardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); recordsDeliveryQueue.clear(); } } @@ -461,7 +489,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher { try { bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { - log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher. rid {}", shardId, getLastRequestId()); + log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." + + " Last successful response: RequestId - {}, Timestamp - {}", shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); errorOccurred(triggeringFlow, t); } } @@ -557,8 +586,8 @@ public class FanOutRecordsPublisher extends RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. rid {}", - shardId, n, getLastRequestId()); + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}", + shardId, n, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); return; } if (flow == null) { @@ -584,14 +613,14 @@ public class FanOutRecordsPublisher extends RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. rid {}", - shardId, getLastRequestId()); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful response: RequestId - {}, Timestamp - {}", + shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); return; } if (!hasValidSubscriber()) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. rid {}", - shardId, getLastRequestId()); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful response: RequestId - {}, Timestamp - {}", + shardId, getLastSuccessfulResponseRequestId(), getLastSuccessfulResponseTimestamp()); } subscriber = null; if (flow != null) { @@ -681,6 +710,7 @@ public class FanOutRecordsPublisher extends RecordsPublisher { private boolean isDisposed = false; private boolean isErrorDispatched = false; private boolean isCancelled = false; + private RequestDetails lastSuccessfulResponseDetails; @Override public void onEventStream(SdkPublisher publisher) { @@ -718,9 +748,11 @@ public class FanOutRecordsPublisher extends RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { - parent.setLastRequestId(response.responseMetadata().requestId()); - log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. rid: {}, erid: {}, sdkfields: {}", - parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), response.sdkFields()); + log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response: RequestId - {}, Timestamp - {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}", + parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), connectionStartedAt); + + lastSuccessfulResponseDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString()); + parent.setLastSuccessfulResponseDetails(lastSuccessfulResponseDetails); } @Override @@ -782,11 +814,9 @@ public class FanOutRecordsPublisher extends RecordsPublisher { .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); } catch (Exception e) { log.warn( - "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. rid {}", - parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), - parent.recordsDeliveryQueue.remainingCapacity(), - subscriptionShutdownEvent.getShutdownEventThrowableOptional(), - parent.getLastRequestId()); + "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response: RequestId - {}, Timestamp - {}", + parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), + subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp()); } } @@ -802,13 +832,14 @@ public class FanOutRecordsPublisher extends RecordsPublisher { // the // subscription, which was cancelled for a reason (usually queue overflow). // - log.warn("{}: complete called on a cancelled subscription. Ignoring completion. rid {}", parent.shardId, parent.getLastRequestId()); + log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful response: RequestId - {}, Timestamp - {}", + parent.shardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp()); return; } if (this.isDisposed) { log.warn( - "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. rid {}", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastRequestId()); + "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful response: RequestId - {}, Timestamp - {}", + parent.shardId, connectionStartedAt, subscribeToShardId, parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp()); return; } @@ -944,7 +975,6 @@ public class FanOutRecordsPublisher extends RecordsPublisher { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete", parent.shardId, connectionStartedAt, subscribeToShardId); - } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index f45be39e..7cdea6c9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.polling; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import org.reactivestreams.Subscriber; @@ -23,6 +24,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; @@ -35,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; * GetRecordsRetrievalStrategy class. */ @KinesisClientInternalApi -public class BlockingRecordsPublisher extends RecordsPublisher { +public class BlockingRecordsPublisher implements RecordsPublisher { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -70,6 +72,21 @@ public class BlockingRecordsPublisher extends RecordsPublisher { getRecordsRetrievalStrategy.shutdown(); } + @Override + public Optional getLastSuccessfulResponseDetails() { + return Optional.empty(); + } + + @Override + public String getLastSuccessfulResponseRequestId() { + return NONE; + } + + @Override + public String getLastSuccessfulResponseTimestamp() { + return NONE; + } + @Override public void subscribe(Subscriber s) { subscriber = s; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 6538bd26..c8202d05 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -46,6 +47,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; @@ -76,7 +78,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery */ @Slf4j @KinesisClientInternalApi -public class PrefetchRecordsPublisher extends RecordsPublisher { +public class PrefetchRecordsPublisher implements RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private int maxPendingProcessRecordsInput; private int maxByteSize; @@ -260,6 +262,21 @@ public class PrefetchRecordsPublisher extends RecordsPublisher { started = false; } + @Override + public Optional getLastSuccessfulResponseDetails() { + return Optional.empty(); + } + + @Override + public String getLastSuccessfulResponseRequestId() { + return NONE; + } + + @Override + public String getLastSuccessfulResponseTimestamp() { + return NONE; + } + @Override public void restartFrom(RecordsRetrieved recordsRetrieved) { if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 149131d5..c98ebfe7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -61,6 +62,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; @@ -490,7 +492,7 @@ public class ShardConsumerSubscriberTest { } } - private class TestPublisher extends RecordsPublisher { + private class TestPublisher implements RecordsPublisher { private final LinkedList responses = new LinkedList<>(); protected volatile long requested = 0; @@ -556,6 +558,21 @@ public class ShardConsumerSubscriberTest { } + @Override + public Optional getLastSuccessfulResponseDetails() { + return Optional.empty(); + } + + @Override + public String getLastSuccessfulResponseRequestId() { + return NONE; + } + + @Override + public String getLastSuccessfulResponseTimestamp() { + return NONE; + } + @Override public void subscribe(Subscriber s) { subscriber = s; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 4889191a..e16e4b51 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -70,6 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput; @@ -168,7 +169,7 @@ public class ShardConsumerTest { assertThat(remainder.isEmpty(), equalTo(true)); } - private class TestPublisher extends RecordsPublisher { + private class TestPublisher implements RecordsPublisher { final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier requestBarrier = new CyclicBarrier(2); @@ -209,6 +210,21 @@ public class ShardConsumerTest { } + @Override + public Optional getLastSuccessfulResponseDetails() { + return Optional.empty(); + } + + @Override + public String getLastSuccessfulResponseRequestId() { + return NONE; + } + + @Override + public String getLastSuccessfulResponseTimestamp() { + return NONE; + } + @Override public void subscribe(Subscriber s) { subscriber = s;