diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java index bc170920..ca14155e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/RequestDetails.java @@ -1,14 +1,10 @@ package software.amazon.kinesis.common; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import java.util.Optional; @Accessors(fluent=true) -@Getter public class RequestDetails { /** @@ -20,12 +16,13 @@ public class RequestDetails { private final Optional timestamp; public RequestDetails() { - this(Optional.empty(), Optional.empty()); + this.requestId = Optional.empty(); + this.timestamp = Optional.empty(); } - public RequestDetails(Optional requestId, Optional timestamp) { - this.requestId = requestId; - this.timestamp = timestamp; + public RequestDetails(String requestId, String timestamp) { + this.requestId = Optional.of(requestId); + this.timestamp = Optional.of(timestamp); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 0b837bde..dd0a0397 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -698,7 +698,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private boolean isDisposed = false; private boolean isErrorDispatched = false; private boolean isCancelled = false; - private RequestDetails recordFlowLastSuccessfuRequestDetails; @Override public void onEventStream(SdkPublisher publisher) { @@ -736,11 +735,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher { @Override public void responseReceived(SubscribeToShardResponse response) { - log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {} -- Last successful request details -- {} ", - parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), parent.lastSuccessfulRequestDetails); + log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {}", + parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId()); - recordFlowLastSuccessfuRequestDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString())); - parent.setLastSuccessfulRequestDetails(this.recordFlowLastSuccessfuRequestDetails); + final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString()); + parent.setLastSuccessfulRequestDetails(requestDetails); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index 23004b51..1e6462f5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -15,8 +15,8 @@ package software.amazon.kinesis.retrieval.polling; +import java.time.Instant; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.reactivestreams.Subscriber; @@ -42,7 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private Subscriber subscriber; - private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); + private RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); public BlockingRecordsPublisher(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { @@ -60,6 +60,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher { public ProcessRecordsInput getNextResult() { GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + final RequestDetails getRecordsRequestDetails = new RequestDetails(getRecordsResult.responseMetadata().requestId(), Instant.now().toString()); + setLastSuccessfulRequestDetails(getRecordsRequestDetails); List records = getRecordsResult.records().stream() .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); return ProcessRecordsInput.builder() @@ -73,6 +75,10 @@ public class BlockingRecordsPublisher implements RecordsPublisher { getRecordsRetrievalStrategy.shutdown(); } + private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) { + lastSuccessfulRequestDetails = requestDetails; + } + @Override public RequestDetails getLastSuccessfulRequestDetails() { return lastSuccessfulRequestDetails; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index a0773bf8..dcd5e043 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -18,7 +18,6 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 6c7b09eb..f5772aaf 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -54,7 +54,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; @@ -76,6 +75,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -115,6 +115,7 @@ public class PrefetchRecordsPublisherTest { private String operation = "ProcessTask"; private GetRecordsResponse getRecordsResponse; private Record record; + private RequestDetails requestDetails; @Before public void setup() {