Addressing PR feedback.

This commit is contained in:
jushkem 2020-02-17 10:35:20 -08:00
parent dee8bd5d78
commit 64db125496
5 changed files with 19 additions and 17 deletions

View file

@ -1,14 +1,10 @@
package software.amazon.kinesis.common; package software.amazon.kinesis.common;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import java.util.Optional; import java.util.Optional;
@Accessors(fluent=true) @Accessors(fluent=true)
@Getter
public class RequestDetails { public class RequestDetails {
/** /**
@ -20,12 +16,13 @@ public class RequestDetails {
private final Optional<String> timestamp; private final Optional<String> timestamp;
public RequestDetails() { public RequestDetails() {
this(Optional.empty(), Optional.empty()); this.requestId = Optional.empty();
this.timestamp = Optional.empty();
} }
public RequestDetails(Optional<String> requestId, Optional<String> timestamp) { public RequestDetails(String requestId, String timestamp) {
this.requestId = requestId; this.requestId = Optional.of(requestId);
this.timestamp = timestamp; this.timestamp = Optional.of(timestamp);
} }
/** /**

View file

@ -698,7 +698,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private boolean isDisposed = false; private boolean isDisposed = false;
private boolean isErrorDispatched = false; private boolean isErrorDispatched = false;
private boolean isCancelled = false; private boolean isCancelled = false;
private RequestDetails recordFlowLastSuccessfuRequestDetails;
@Override @Override
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
@ -736,11 +735,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override @Override
public void responseReceived(SubscribeToShardResponse response) { public void responseReceived(SubscribeToShardResponse response) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {} -- Last successful request details -- {} ", log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {}",
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), parent.lastSuccessfulRequestDetails); parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId());
recordFlowLastSuccessfuRequestDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString())); final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString());
parent.setLastSuccessfulRequestDetails(this.recordFlowLastSuccessfuRequestDetails); parent.setLastSuccessfulRequestDetails(requestDetails);
} }
@Override @Override

View file

@ -15,8 +15,8 @@
package software.amazon.kinesis.retrieval.polling; package software.amazon.kinesis.retrieval.polling;
import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
@ -42,7 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private Subscriber<? super RecordsRetrieved> subscriber; private Subscriber<? super RecordsRetrieved> subscriber;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); private RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
public BlockingRecordsPublisher(final int maxRecordsPerCall, public BlockingRecordsPublisher(final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
@ -60,6 +60,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
public ProcessRecordsInput getNextResult() { public ProcessRecordsInput getNextResult() {
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
final RequestDetails getRecordsRequestDetails = new RequestDetails(getRecordsResult.responseMetadata().requestId(), Instant.now().toString());
setLastSuccessfulRequestDetails(getRecordsRequestDetails);
List<KinesisClientRecord> records = getRecordsResult.records().stream() List<KinesisClientRecord> records = getRecordsResult.records().stream()
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); .map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
return ProcessRecordsInput.builder() return ProcessRecordsInput.builder()
@ -73,6 +75,10 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
getRecordsRetrievalStrategy.shutdown(); getRecordsRetrievalStrategy.shutdown();
} }
private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) {
lastSuccessfulRequestDetails = requestDetails;
}
@Override @Override
public RequestDetails getLastSuccessfulRequestDetails() { public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails; return lastSuccessfulRequestDetails;

View file

@ -18,7 +18,6 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;

View file

@ -54,7 +54,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -76,6 +75,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -115,6 +115,7 @@ public class PrefetchRecordsPublisherTest {
private String operation = "ProcessTask"; private String operation = "ProcessTask";
private GetRecordsResponse getRecordsResponse; private GetRecordsResponse getRecordsResponse;
private Record record; private Record record;
private RequestDetails requestDetails;
@Before @Before
public void setup() { public void setup() {