diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java new file mode 100644 index 00000000..ca14155e --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java @@ -0,0 +1,52 @@ +package software.amazon.kinesis.common; + +import lombok.experimental.Accessors; + +import java.util.Optional; + +@Accessors(fluent=true) +public class RequestDetails { + + /** + * Placeholder for logging when no successful request has been made. + */ + private static final String NONE = "NONE"; + + private final Optional requestId; + private final Optional timestamp; + + public RequestDetails() { + this.requestId = Optional.empty(); + this.timestamp = Optional.empty(); + } + + public RequestDetails(String requestId, String timestamp) { + this.requestId = Optional.of(requestId); + this.timestamp = Optional.of(timestamp); + } + + /** + * Gets last successful request's request id. + * + * @return requestId associated with last successful request. + */ + public String getRequestId() { + return requestId.orElse(NONE); + } + + /** + * Gets last successful request's timestamp. + * + * @return timestamp associated with last successful request. + */ + public String getTimestamp() { + return timestamp.orElse(NONE); + } + + @Override + public String toString() { + return String.format("request id - %s, timestamp - %s", getRequestId(), getTimestamp()); + } + +} + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 14e347a2..4c05ac94 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber { Duration timeSinceLastResponse = Duration.between(lastRequestTime, now); if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { log.error( - "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.", - shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse); + "{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}", + shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails()); cancel(); // Start the subscription again which will update the lastRequestTime as well. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index cd1e04f1..5fc029b4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -18,16 +18,22 @@ package software.amazon.kinesis.retrieval; import org.reactivestreams.Publisher; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.Optional; + /** * Provides a record publisher that will retrieve records from Kinesis for processing */ public interface RecordsPublisher extends Publisher { + + + /** * Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will * begin from that sequence number, otherwise it will use the initial position. - * + * * @param extendedSequenceNumber * the sequence number to start processing from * @param initialPositionInStreamExtended @@ -40,15 +46,23 @@ public interface RecordsPublisher extends Publisher { * @param recordsRetrieved the processRecordsInput to restart from */ void restartFrom(RecordsRetrieved recordsRetrieved); - + /** * Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. */ void shutdown(); + /** + * Gets last successful request details. + * + * @return details associated with last successful request. + */ + RequestDetails getLastSuccessfulRequestDetails(); + /** * Notify the publisher on receipt of a data event. + * * @param ack acknowledgement received from the subscriber. */ default void notify(RecordsDeliveryAck ack) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index b96ff8c1..27cad136 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -37,6 +37,7 @@ import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; import software.amazon.kinesis.retrieval.IteratorBuilder; @@ -50,6 +51,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -87,6 +89,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>( MAX_EVENT_BURST_FROM_SERVICE); + private RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); + @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { @@ -143,6 +147,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + @Override + public RequestDetails getLastSuccessfulRequestDetails() { + return lastSuccessfulRequestDetails; + } + + private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) { + lastSuccessfulRequestDetails = requestDetails; + } + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. @VisibleForTesting RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) { @@ -204,8 +217,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { subscriber.onNext(recordsRetrieved); } } catch (IllegalStateException e) { - log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", - shardId, recordsDeliveryQueue.remainingCapacity()); + + log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}", + shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails); throw e; } catch (Throwable t) { log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); @@ -288,12 +302,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (!hasValidSubscriber()) { if(hasValidFlow()) { log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", - shardId, flow.connectionStartedAt, flow.subscribeToShardId); + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." + + " Last successful request details -- {}", shardId, flow.connectionStartedAt, + flow.subscribeToShardId, lastSuccessfulRequestDetails); } else { log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null", - shardId); + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." + + " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); } return; } @@ -304,8 +319,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (isActiveFlow(triggeringFlow)) { if (flow != null) { String logMessage = String.format( - "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString); + "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." + + " Last successful request details -- %s", + shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails); switch (category.throwableType) { case READ_TIMEOUT: log.debug(logMessage, propagationThrowable); @@ -329,7 +345,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { try { handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); + log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails, innerThrowable); } subscriber = null; flow = null; @@ -351,7 +367,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" - + "previous subscription - {} ", shardId, subscribeToShardId); + + "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails); recordsDeliveryQueue.clear(); } } @@ -461,7 +477,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { try { bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { - log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId); + log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." + + " Last successful request details -- {}", shardId, lastSuccessfulRequestDetails); errorOccurred(triggeringFlow, t); } } @@ -557,8 +574,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.", - shardId, n); + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}", + shardId, n, lastSuccessfulRequestDetails); return; } if (flow == null) { @@ -584,14 +601,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { synchronized (lockObject) { if (subscriber != s) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.", - shardId); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}", + shardId, lastSuccessfulRequestDetails); return; } if (!hasValidSubscriber()) { log.warn( - "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber", - shardId); + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}", + shardId, lastSuccessfulRequestDetails); } subscriber = null; if (flow != null) { @@ -718,8 +735,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { - log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received", - parent.shardId, connectionStartedAt, subscribeToShardId); + log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Request id - {}", + parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId()); + + final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString()); + parent.setLastSuccessfulRequestDetails(requestDetails); } @Override @@ -781,10 +801,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); } catch (Exception e) { log.warn( - "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ", - parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), - parent.recordsDeliveryQueue.remainingCapacity(), - subscriptionShutdownEvent.getShutdownEventThrowableOptional()); + "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}", + parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), + parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional()); } } @@ -800,13 +819,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // the // subscription, which was cancelled for a reason (usually queue overflow). // - log.warn("{}: complete called on a cancelled subscription. Ignoring completion", parent.shardId); + log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}", + parent.shardId, parent.lastSuccessfulRequestDetails); return; } if (this.isDisposed) { log.warn( - "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion", - parent.shardId, connectionStartedAt, subscribeToShardId); + "{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}", + parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails); return; } @@ -942,7 +962,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher { log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete", parent.shardId, connectionStartedAt, subscribeToShardId); - } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index d6df725a..1e6462f5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.retrieval.polling; +import java.time.Instant; import java.util.List; import java.util.stream.Collectors; @@ -23,6 +24,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; @@ -40,6 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private Subscriber subscriber; + private RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); public BlockingRecordsPublisher(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { @@ -57,6 +60,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher { public ProcessRecordsInput getNextResult() { GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + final RequestDetails getRecordsRequestDetails = new RequestDetails(getRecordsResult.responseMetadata().requestId(), Instant.now().toString()); + setLastSuccessfulRequestDetails(getRecordsRequestDetails); List records = getRecordsResult.records().stream() .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); return ProcessRecordsInput.builder() @@ -70,6 +75,15 @@ public class BlockingRecordsPublisher implements RecordsPublisher { getRecordsRetrievalStrategy.shutdown(); } + private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) { + lastSuccessfulRequestDetails = requestDetails; + } + + @Override + public RequestDetails getLastSuccessfulRequestDetails() { + return lastSuccessfulRequestDetails; + } + @Override public void subscribe(Subscriber s) { subscriber = s; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 3f28ddf4..dcd5e043 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -46,6 +46,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; @@ -98,6 +99,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private boolean wasReset = false; private Instant lastEventDeliveryTime = Instant.EPOCH; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); @Data @Accessors(fluent = true) @@ -260,6 +262,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { started = false; } + @Override + public RequestDetails getLastSuccessfulRequestDetails() { + return lastSuccessfulRequestDetails; + } + @Override public void restartFrom(RecordsRetrieved recordsRetrieved) { if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 4d0f01ee..78e09fa1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -61,6 +62,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; @@ -77,6 +79,8 @@ public class ShardConsumerSubscriberTest { private static final String TERMINAL_MARKER = "Terminal"; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); + @Mock private ShardConsumer shardConsumer; @Mock @@ -556,6 +560,11 @@ public class ShardConsumerSubscriberTest { } + @Override + public RequestDetails getLastSuccessfulRequestDetails() { + return lastSuccessfulRequestDetails; + } + @Override public void subscribe(Subscriber s) { subscriber = s; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index e5db170b..46677fb9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -70,6 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput; @@ -88,6 +89,7 @@ public class ShardConsumerTest { private final String shardId = "shardId-0-0"; private final String concurrencyToken = "TestToken"; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); private ShardInfo shardInfo; private TaskExecutionListenerInput initialTaskInput; private TaskExecutionListenerInput processTaskInput; @@ -209,6 +211,11 @@ public class ShardConsumerTest { } + @Override + public RequestDetails getLastSuccessfulRequestDetails() { + return lastSuccessfulRequestDetails; + } + @Override public void subscribe(Subscriber s) { subscriber = s; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 6c7b09eb..f5772aaf 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -54,7 +54,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; @@ -76,6 +75,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -115,6 +115,7 @@ public class PrefetchRecordsPublisherTest { private String operation = "ProcessTask"; private GetRecordsResponse getRecordsResponse; private Record record; + private RequestDetails requestDetails; @Before public void setup() {