Adding request id logging to SubscribeToShard response. (#678)
This commit is contained in:
parent
b3bcc59697
commit
189df4bc90
9 changed files with 155 additions and 32 deletions
|
|
@ -0,0 +1,52 @@
|
||||||
|
package software.amazon.kinesis.common;
|
||||||
|
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@Accessors(fluent=true)
|
||||||
|
public class RequestDetails {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Placeholder for logging when no successful request has been made.
|
||||||
|
*/
|
||||||
|
private static final String NONE = "NONE";
|
||||||
|
|
||||||
|
private final Optional<String> requestId;
|
||||||
|
private final Optional<String> timestamp;
|
||||||
|
|
||||||
|
public RequestDetails() {
|
||||||
|
this.requestId = Optional.empty();
|
||||||
|
this.timestamp = Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public RequestDetails(String requestId, String timestamp) {
|
||||||
|
this.requestId = Optional.of(requestId);
|
||||||
|
this.timestamp = Optional.of(timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets last successful request's request id.
|
||||||
|
*
|
||||||
|
* @return requestId associated with last successful request.
|
||||||
|
*/
|
||||||
|
public String getRequestId() {
|
||||||
|
return requestId.orElse(NONE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets last successful request's timestamp.
|
||||||
|
*
|
||||||
|
* @return timestamp associated with last successful request.
|
||||||
|
*/
|
||||||
|
public String getTimestamp() {
|
||||||
|
return timestamp.orElse(NONE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("request id - %s, timestamp - %s", getRequestId(), getTimestamp());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -129,8 +129,8 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
|
||||||
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
|
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
|
||||||
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
|
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
|
||||||
log.error(
|
log.error(
|
||||||
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.",
|
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}",
|
||||||
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse);
|
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
|
||||||
cancel();
|
cancel();
|
||||||
|
|
||||||
// Start the subscription again which will update the lastRequestTime as well.
|
// Start the subscription again which will update the lastRequestTime as well.
|
||||||
|
|
|
||||||
|
|
@ -18,16 +18,22 @@ package software.amazon.kinesis.retrieval;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides a record publisher that will retrieve records from Kinesis for processing
|
* Provides a record publisher that will retrieve records from Kinesis for processing
|
||||||
*/
|
*/
|
||||||
public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
|
public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will
|
* Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will
|
||||||
* begin from that sequence number, otherwise it will use the initial position.
|
* begin from that sequence number, otherwise it will use the initial position.
|
||||||
*
|
*
|
||||||
* @param extendedSequenceNumber
|
* @param extendedSequenceNumber
|
||||||
* the sequence number to start processing from
|
* the sequence number to start processing from
|
||||||
* @param initialPositionInStreamExtended
|
* @param initialPositionInStreamExtended
|
||||||
|
|
@ -40,15 +46,23 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
|
||||||
* @param recordsRetrieved the processRecordsInput to restart from
|
* @param recordsRetrieved the processRecordsInput to restart from
|
||||||
*/
|
*/
|
||||||
void restartFrom(RecordsRetrieved recordsRetrieved);
|
void restartFrom(RecordsRetrieved recordsRetrieved);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
|
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
|
||||||
*/
|
*/
|
||||||
void shutdown();
|
void shutdown();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets last successful request details.
|
||||||
|
*
|
||||||
|
* @return details associated with last successful request.
|
||||||
|
*/
|
||||||
|
RequestDetails getLastSuccessfulRequestDetails();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify the publisher on receipt of a data event.
|
* Notify the publisher on receipt of a data event.
|
||||||
|
*
|
||||||
* @param ack acknowledgement received from the subscriber.
|
* @param ack acknowledgement received from the subscriber.
|
||||||
*/
|
*/
|
||||||
default void notify(RecordsDeliveryAck ack) {
|
default void notify(RecordsDeliveryAck ack) {
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ import software.amazon.awssdk.utils.Either;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
import software.amazon.kinesis.common.KinesisRequestsBuilder;
|
import software.amazon.kinesis.common.KinesisRequestsBuilder;
|
||||||
|
import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
|
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
|
||||||
import software.amazon.kinesis.retrieval.IteratorBuilder;
|
import software.amazon.kinesis.retrieval.IteratorBuilder;
|
||||||
|
|
@ -50,6 +51,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
@ -87,6 +89,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
|
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
|
||||||
MAX_EVENT_BURST_FROM_SERVICE);
|
MAX_EVENT_BURST_FROM_SERVICE);
|
||||||
|
|
||||||
|
private RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start(ExtendedSequenceNumber extendedSequenceNumber,
|
public void start(ExtendedSequenceNumber extendedSequenceNumber,
|
||||||
InitialPositionInStreamExtended initialPositionInStreamExtended) {
|
InitialPositionInStreamExtended initialPositionInStreamExtended) {
|
||||||
|
|
@ -143,6 +147,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RequestDetails getLastSuccessfulRequestDetails() {
|
||||||
|
return lastSuccessfulRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) {
|
||||||
|
lastSuccessfulRequestDetails = requestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
|
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) {
|
RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) {
|
||||||
|
|
@ -204,8 +217,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
subscriber.onNext(recordsRetrieved);
|
subscriber.onNext(recordsRetrieved);
|
||||||
}
|
}
|
||||||
} catch (IllegalStateException e) {
|
} catch (IllegalStateException e) {
|
||||||
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ",
|
|
||||||
shardId, recordsDeliveryQueue.remainingCapacity());
|
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}",
|
||||||
|
shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
|
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
|
||||||
|
|
@ -288,12 +302,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
if (!hasValidSubscriber()) {
|
if (!hasValidSubscriber()) {
|
||||||
if(hasValidFlow()) {
|
if(hasValidFlow()) {
|
||||||
log.warn(
|
log.warn(
|
||||||
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null",
|
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
|
||||||
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
|
" Last successful request details -- {}", shardId, flow.connectionStartedAt,
|
||||||
|
flow.subscribeToShardId, lastSuccessfulRequestDetails);
|
||||||
} else {
|
} else {
|
||||||
log.warn(
|
log.warn(
|
||||||
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null",
|
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
|
||||||
shardId);
|
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -304,8 +319,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
if (isActiveFlow(triggeringFlow)) {
|
if (isActiveFlow(triggeringFlow)) {
|
||||||
if (flow != null) {
|
if (flow != null) {
|
||||||
String logMessage = String.format(
|
String logMessage = String.format(
|
||||||
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s",
|
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
|
||||||
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString);
|
" Last successful request details -- %s",
|
||||||
|
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
|
||||||
switch (category.throwableType) {
|
switch (category.throwableType) {
|
||||||
case READ_TIMEOUT:
|
case READ_TIMEOUT:
|
||||||
log.debug(logMessage, propagationThrowable);
|
log.debug(logMessage, propagationThrowable);
|
||||||
|
|
@ -329,7 +345,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
try {
|
try {
|
||||||
handleFlowError(propagationThrowable, triggeringFlow);
|
handleFlowError(propagationThrowable, triggeringFlow);
|
||||||
} catch (Throwable innerThrowable) {
|
} catch (Throwable innerThrowable) {
|
||||||
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
|
log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails, innerThrowable);
|
||||||
}
|
}
|
||||||
subscriber = null;
|
subscriber = null;
|
||||||
flow = null;
|
flow = null;
|
||||||
|
|
@ -351,7 +367,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
// Clear any lingering records in the queue.
|
// Clear any lingering records in the queue.
|
||||||
if (!recordsDeliveryQueue.isEmpty()) {
|
if (!recordsDeliveryQueue.isEmpty()) {
|
||||||
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
|
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
|
||||||
+ "previous subscription - {} ", shardId, subscribeToShardId);
|
+ "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails);
|
||||||
recordsDeliveryQueue.clear();
|
recordsDeliveryQueue.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -461,7 +477,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
try {
|
try {
|
||||||
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
|
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId);
|
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." +
|
||||||
|
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
|
||||||
errorOccurred(triggeringFlow, t);
|
errorOccurred(triggeringFlow, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -557,8 +574,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
synchronized (lockObject) {
|
synchronized (lockObject) {
|
||||||
if (subscriber != s) {
|
if (subscriber != s) {
|
||||||
log.warn(
|
log.warn(
|
||||||
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.",
|
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
|
||||||
shardId, n);
|
shardId, n, lastSuccessfulRequestDetails);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (flow == null) {
|
if (flow == null) {
|
||||||
|
|
@ -584,14 +601,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
synchronized (lockObject) {
|
synchronized (lockObject) {
|
||||||
if (subscriber != s) {
|
if (subscriber != s) {
|
||||||
log.warn(
|
log.warn(
|
||||||
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.",
|
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
|
||||||
shardId);
|
shardId, lastSuccessfulRequestDetails);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!hasValidSubscriber()) {
|
if (!hasValidSubscriber()) {
|
||||||
log.warn(
|
log.warn(
|
||||||
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber",
|
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
|
||||||
shardId);
|
shardId, lastSuccessfulRequestDetails);
|
||||||
}
|
}
|
||||||
subscriber = null;
|
subscriber = null;
|
||||||
if (flow != null) {
|
if (flow != null) {
|
||||||
|
|
@ -718,8 +735,11 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void responseReceived(SubscribeToShardResponse response) {
|
public void responseReceived(SubscribeToShardResponse response) {
|
||||||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received",
|
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Request id - {}",
|
||||||
parent.shardId, connectionStartedAt, subscribeToShardId);
|
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId());
|
||||||
|
|
||||||
|
final RequestDetails requestDetails = new RequestDetails(response.responseMetadata().requestId(), connectionStartedAt.toString());
|
||||||
|
parent.setLastSuccessfulRequestDetails(requestDetails);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -781,10 +801,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
|
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn(
|
log.warn(
|
||||||
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ",
|
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}",
|
||||||
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(),
|
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
|
||||||
parent.recordsDeliveryQueue.remainingCapacity(),
|
parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional());
|
||||||
subscriptionShutdownEvent.getShutdownEventThrowableOptional());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -800,13 +819,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
// the
|
// the
|
||||||
// subscription, which was cancelled for a reason (usually queue overflow).
|
// subscription, which was cancelled for a reason (usually queue overflow).
|
||||||
//
|
//
|
||||||
log.warn("{}: complete called on a cancelled subscription. Ignoring completion", parent.shardId);
|
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}",
|
||||||
|
parent.shardId, parent.lastSuccessfulRequestDetails);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (this.isDisposed) {
|
if (this.isDisposed) {
|
||||||
log.warn(
|
log.warn(
|
||||||
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion",
|
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}",
|
||||||
parent.shardId, connectionStartedAt, subscribeToShardId);
|
parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -942,7 +962,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
log.debug(
|
log.debug(
|
||||||
"{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete",
|
"{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete",
|
||||||
parent.shardId, connectionStartedAt, subscribeToShardId);
|
parent.shardId, connectionStartedAt, subscribeToShardId);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
package software.amazon.kinesis.retrieval.polling;
|
package software.amazon.kinesis.retrieval.polling;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
@ -23,6 +24,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
|
||||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
||||||
|
import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
||||||
import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
||||||
|
|
@ -40,6 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
|
||||||
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
|
|
||||||
private Subscriber<? super RecordsRetrieved> subscriber;
|
private Subscriber<? super RecordsRetrieved> subscriber;
|
||||||
|
private RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||||
|
|
||||||
public BlockingRecordsPublisher(final int maxRecordsPerCall,
|
public BlockingRecordsPublisher(final int maxRecordsPerCall,
|
||||||
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||||
|
|
@ -57,6 +60,8 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
|
||||||
|
|
||||||
public ProcessRecordsInput getNextResult() {
|
public ProcessRecordsInput getNextResult() {
|
||||||
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
||||||
|
final RequestDetails getRecordsRequestDetails = new RequestDetails(getRecordsResult.responseMetadata().requestId(), Instant.now().toString());
|
||||||
|
setLastSuccessfulRequestDetails(getRecordsRequestDetails);
|
||||||
List<KinesisClientRecord> records = getRecordsResult.records().stream()
|
List<KinesisClientRecord> records = getRecordsResult.records().stream()
|
||||||
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
|
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
|
||||||
return ProcessRecordsInput.builder()
|
return ProcessRecordsInput.builder()
|
||||||
|
|
@ -70,6 +75,15 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
|
||||||
getRecordsRetrievalStrategy.shutdown();
|
getRecordsRetrievalStrategy.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) {
|
||||||
|
lastSuccessfulRequestDetails = requestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RequestDetails getLastSuccessfulRequestDetails() {
|
||||||
|
return lastSuccessfulRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
|
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
|
||||||
subscriber = s;
|
subscriber = s;
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
|
||||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||||
|
|
@ -98,6 +99,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
private boolean wasReset = false;
|
private boolean wasReset = false;
|
||||||
|
|
||||||
private Instant lastEventDeliveryTime = Instant.EPOCH;
|
private Instant lastEventDeliveryTime = Instant.EPOCH;
|
||||||
|
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@Accessors(fluent = true)
|
@Accessors(fluent = true)
|
||||||
|
|
@ -260,6 +262,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
started = false;
|
started = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RequestDetails getLastSuccessfulRequestDetails() {
|
||||||
|
return lastSuccessfulRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void restartFrom(RecordsRetrieved recordsRetrieved) {
|
public void restartFrom(RecordsRetrieved recordsRetrieved) {
|
||||||
if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) {
|
if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) {
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
@ -61,6 +62,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
||||||
|
|
@ -77,6 +79,8 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
private static final String TERMINAL_MARKER = "Terminal";
|
private static final String TERMINAL_MARKER = "Terminal";
|
||||||
|
|
||||||
|
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private ShardConsumer shardConsumer;
|
private ShardConsumer shardConsumer;
|
||||||
@Mock
|
@Mock
|
||||||
|
|
@ -556,6 +560,11 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RequestDetails getLastSuccessfulRequestDetails() {
|
||||||
|
return lastSuccessfulRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
|
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
|
||||||
subscriber = s;
|
subscriber = s;
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
|
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
|
||||||
|
|
@ -88,6 +89,7 @@ public class ShardConsumerTest {
|
||||||
|
|
||||||
private final String shardId = "shardId-0-0";
|
private final String shardId = "shardId-0-0";
|
||||||
private final String concurrencyToken = "TestToken";
|
private final String concurrencyToken = "TestToken";
|
||||||
|
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||||
private ShardInfo shardInfo;
|
private ShardInfo shardInfo;
|
||||||
private TaskExecutionListenerInput initialTaskInput;
|
private TaskExecutionListenerInput initialTaskInput;
|
||||||
private TaskExecutionListenerInput processTaskInput;
|
private TaskExecutionListenerInput processTaskInput;
|
||||||
|
|
@ -209,6 +211,11 @@ public class ShardConsumerTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RequestDetails getLastSuccessfulRequestDetails() {
|
||||||
|
return lastSuccessfulRequestDetails;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
|
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
|
||||||
subscriber = s;
|
subscriber = s;
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,6 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -76,6 +75,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
|
||||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
||||||
import software.amazon.awssdk.services.kinesis.model.Record;
|
import software.amazon.awssdk.services.kinesis.model.Record;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
|
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
|
||||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||||
|
|
@ -115,6 +115,7 @@ public class PrefetchRecordsPublisherTest {
|
||||||
private String operation = "ProcessTask";
|
private String operation = "ProcessTask";
|
||||||
private GetRecordsResponse getRecordsResponse;
|
private GetRecordsResponse getRecordsResponse;
|
||||||
private Record record;
|
private Record record;
|
||||||
|
private RequestDetails requestDetails;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue