Fix to prevent the onNext event going to stale subscription when restart happens in poller (#606)
* Fix to prevent the onNext event going to stale subscription when restart happens in poller * Isolating session variables into a new class. Replacing thread control shifting logic for publishing with monitor based control * Refactoring based on review comments * Addressing review comments on unit test cases
This commit is contained in:
parent
a94dc7d61d
commit
3fead19df7
3 changed files with 189 additions and 154 deletions
|
|
@ -22,11 +22,13 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import lombok.AccessLevel;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import org.reactivestreams.Subscriber;
|
import org.reactivestreams.Subscriber;
|
||||||
|
|
@ -70,17 +72,12 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
|
||||||
* the record processor is blocked till records are retrieved from Kinesis.
|
* the record processor is blocked till records are retrieved from Kinesis.
|
||||||
*
|
*
|
||||||
* There are three threads namely publisher, demand-notifier and ack-notifier which will contend to drain the events
|
* There are three threads namely publisher, demand-notifier and ack-notifier which will contend to drain the events
|
||||||
* to the Subscriber (ShardConsumer in KCL). The publisher/demand-notifier thread gains the control to drain only when
|
* to the Subscriber (ShardConsumer in KCL).
|
||||||
* there is no pending event in the prefetch queue waiting for the ack. Otherwise, it will be the ack-notifier thread
|
|
||||||
* which will drain an event on the receipt of an ack.
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@KinesisClientInternalApi
|
@KinesisClientInternalApi
|
||||||
public class PrefetchRecordsPublisher implements RecordsPublisher {
|
public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
|
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
|
||||||
@VisibleForTesting
|
|
||||||
LinkedBlockingQueue<PrefetchRecordsRetrieved> getRecordsResultQueue;
|
|
||||||
private int maxPendingProcessRecordsInput;
|
private int maxPendingProcessRecordsInput;
|
||||||
private int maxByteSize;
|
private int maxByteSize;
|
||||||
private int maxRecordsCount;
|
private int maxRecordsCount;
|
||||||
|
|
@ -91,26 +88,101 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
private final long idleMillisBetweenCalls;
|
private final long idleMillisBetweenCalls;
|
||||||
private Instant lastSuccessfulCall;
|
private Instant lastSuccessfulCall;
|
||||||
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
|
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
|
||||||
private PrefetchCounters prefetchCounters;
|
|
||||||
private boolean started = false;
|
private boolean started = false;
|
||||||
private final String operation;
|
private final String operation;
|
||||||
private final KinesisDataFetcher dataFetcher;
|
|
||||||
private final String shardId;
|
private final String shardId;
|
||||||
|
|
||||||
private Subscriber<? super RecordsRetrieved> subscriber;
|
private Subscriber<? super RecordsRetrieved> subscriber;
|
||||||
private final AtomicLong requestedResponses = new AtomicLong(0);
|
@VisibleForTesting @Getter
|
||||||
|
private final PublisherSession publisherSession;
|
||||||
private String highestSequenceNumber;
|
|
||||||
private InitialPositionInStreamExtended initialPositionInStreamExtended;
|
|
||||||
|
|
||||||
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
|
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
|
||||||
private boolean wasReset = false;
|
private boolean wasReset = false;
|
||||||
|
|
||||||
private Instant lastEventDeliveryTime = Instant.EPOCH;
|
private Instant lastEventDeliveryTime = Instant.EPOCH;
|
||||||
// This flag controls who should drain the next request in the prefetch queue.
|
|
||||||
// When set to false, the publisher and demand-notifier thread would have the control.
|
@Data
|
||||||
// When set to true, the event-notifier thread would have the control.
|
@Accessors(fluent = true)
|
||||||
private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false);
|
static final class PublisherSession {
|
||||||
|
private final AtomicLong requestedResponses = new AtomicLong(0);
|
||||||
|
@VisibleForTesting @Getter
|
||||||
|
private final LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue;
|
||||||
|
private final PrefetchCounters prefetchCounters;
|
||||||
|
private final KinesisDataFetcher dataFetcher;
|
||||||
|
private InitialPositionInStreamExtended initialPositionInStreamExtended;
|
||||||
|
private String highestSequenceNumber;
|
||||||
|
|
||||||
|
// Initialize the session on publisher start.
|
||||||
|
void init(ExtendedSequenceNumber extendedSequenceNumber,
|
||||||
|
InitialPositionInStreamExtended initialPositionInStreamExtended) {
|
||||||
|
this.initialPositionInStreamExtended = initialPositionInStreamExtended;
|
||||||
|
this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
|
||||||
|
this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the session when publisher restarts.
|
||||||
|
void reset(PrefetchRecordsRetrieved prefetchRecordsRetrieved) {
|
||||||
|
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java
|
||||||
|
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber.
|
||||||
|
requestedResponses.set(0);
|
||||||
|
// Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber.
|
||||||
|
prefetchRecordsQueue.clear();
|
||||||
|
prefetchCounters.reset();
|
||||||
|
highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
|
||||||
|
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber,
|
||||||
|
initialPositionInStreamExtended);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle records delivery ack and execute nextEventDispatchAction.
|
||||||
|
// This method is not thread-safe and needs to be called after acquiring a monitor.
|
||||||
|
void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String shardId, Runnable nextEventDispatchAction) {
|
||||||
|
final PrefetchRecordsRetrieved recordsToCheck = peekNextRecord();
|
||||||
|
// Verify if the ack matches the head of the queue and evict it.
|
||||||
|
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) {
|
||||||
|
evictPublishedRecordAndUpdateDemand(shardId);
|
||||||
|
nextEventDispatchAction.run();
|
||||||
|
} else {
|
||||||
|
// Log and ignore any other ack received. As long as an ack is received for head of the queue
|
||||||
|
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible
|
||||||
|
// to happen.
|
||||||
|
final BatchUniqueIdentifier peekedBatchUniqueIdentifier =
|
||||||
|
recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier();
|
||||||
|
log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.",
|
||||||
|
shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Evict the published record from the prefetch queue.
|
||||||
|
// This method is not thread-safe and needs to be called after acquiring a monitor.
|
||||||
|
@VisibleForTesting
|
||||||
|
RecordsRetrieved evictPublishedRecordAndUpdateDemand(String shardId) {
|
||||||
|
final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll();
|
||||||
|
if (result != null) {
|
||||||
|
updateDemandTrackersOnPublish(result);
|
||||||
|
} else {
|
||||||
|
log.info(
|
||||||
|
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
|
||||||
|
+ "was reset.", shardId);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean hasDemandToPublish() {
|
||||||
|
return requestedResponses.get() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
PrefetchRecordsRetrieved peekNextRecord() {
|
||||||
|
return prefetchRecordsQueue.peek();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean offerRecords(PrefetchRecordsRetrieved recordsRetrieved, long idleMillisBetweenCalls) throws InterruptedException {
|
||||||
|
return prefetchRecordsQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) {
|
||||||
|
prefetchCounters.removed(result.processRecordsInput);
|
||||||
|
requestedResponses.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
|
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
|
||||||
|
|
@ -140,15 +212,14 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
|
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
|
||||||
this.maxByteSize = maxByteSize;
|
this.maxByteSize = maxByteSize;
|
||||||
this.maxRecordsCount = maxRecordsCount;
|
this.maxRecordsCount = maxRecordsCount;
|
||||||
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput);
|
this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput),
|
||||||
this.prefetchCounters = new PrefetchCounters();
|
new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher());
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
|
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
|
||||||
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
|
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
|
||||||
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
|
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
|
||||||
Validate.notEmpty(operation, "Operation cannot be empty");
|
Validate.notEmpty(operation, "Operation cannot be empty");
|
||||||
this.operation = operation;
|
this.operation = operation;
|
||||||
this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
|
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -158,9 +229,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
throw new IllegalStateException("ExecutorService has been shutdown.");
|
throw new IllegalStateException("ExecutorService has been shutdown.");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.initialPositionInStreamExtended = initialPositionInStreamExtended;
|
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);
|
||||||
highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
|
|
||||||
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
|
|
||||||
|
|
||||||
if (!started) {
|
if (!started) {
|
||||||
log.info("{} : Starting prefetching thread.", shardId);
|
log.info("{} : Starting prefetching thread.", shardId);
|
||||||
|
|
@ -179,25 +248,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private RecordsRetrieved peekNextResult() {
|
private PrefetchRecordsRetrieved peekNextResult() {
|
||||||
throwOnIllegalState();
|
throwOnIllegalState();
|
||||||
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek();
|
return publisherSession.peekNextRecord();
|
||||||
return result == null ? result : result.prepareForPublish();
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() {
|
|
||||||
throwOnIllegalState();
|
|
||||||
final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll();
|
|
||||||
if (result != null) {
|
|
||||||
prefetchCounters.removed(result.processRecordsInput);
|
|
||||||
requestedResponses.decrementAndGet();
|
|
||||||
} else {
|
|
||||||
log.info(
|
|
||||||
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
|
|
||||||
+ "was reset.", shardId);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -213,21 +266,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher");
|
"Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher");
|
||||||
}
|
}
|
||||||
PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved;
|
|
||||||
resetLock.writeLock().lock();
|
resetLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
getRecordsResultQueue.clear();
|
publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved);
|
||||||
|
|
||||||
// Give the drain control to publisher/demand-notifier thread.
|
|
||||||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
|
|
||||||
getRecordsResultQueue.size(), requestedResponses.get());
|
|
||||||
shouldDrainEventOnlyOnAck.set(false);
|
|
||||||
|
|
||||||
prefetchCounters.reset();
|
|
||||||
|
|
||||||
highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
|
|
||||||
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber,
|
|
||||||
initialPositionInStreamExtended);
|
|
||||||
wasReset = true;
|
wasReset = true;
|
||||||
} finally {
|
} finally {
|
||||||
resetLock.writeLock().unlock();
|
resetLock.writeLock().unlock();
|
||||||
|
|
@ -240,42 +281,23 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
subscriber.onSubscribe(new Subscription() {
|
subscriber.onSubscribe(new Subscription() {
|
||||||
@Override
|
@Override
|
||||||
public void request(long n) {
|
public void request(long n) {
|
||||||
requestedResponses.addAndGet(n);
|
publisherSession.requestedResponses().addAndGet(n);
|
||||||
drainQueueForRequestsIfAllowed();
|
drainQueueForRequests();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel() {
|
public void cancel() {
|
||||||
requestedResponses.set(0);
|
// When the subscription is cancelled, the demand is set to 0, to prevent further
|
||||||
|
// records from being dispatched to the consumer/subscriber. The publisher session state will be
|
||||||
|
// reset when restartFrom(*) is called by the consumer/subscriber.
|
||||||
|
publisherSession.requestedResponses().set(0);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
|
public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
|
||||||
final RecordsRetrieved recordsToCheck = peekNextResult();
|
publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, shardId, () -> drainQueueForRequests());
|
||||||
// Verify if the ack matches the head of the queue and evict it.
|
|
||||||
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier()
|
|
||||||
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
|
|
||||||
pollNextResultAndUpdatePrefetchCounters();
|
|
||||||
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread.
|
|
||||||
if (getRecordsResultQueue.isEmpty()) {
|
|
||||||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
|
|
||||||
shardId, getRecordsResultQueue.size(), requestedResponses.get());
|
|
||||||
shouldDrainEventOnlyOnAck.set(false);
|
|
||||||
} else {
|
|
||||||
// Else attempt to drain the queue.
|
|
||||||
drainQueueForRequests();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Log and ignore any other ack received. As long as an ack is received for head of the queue
|
|
||||||
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible
|
|
||||||
// to happen.
|
|
||||||
final BatchUniqueIdentifier peekedBatchUniqueIdentifier =
|
|
||||||
recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier();
|
|
||||||
log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.",
|
|
||||||
shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
|
|
||||||
}
|
|
||||||
// Take action based on the time spent by the event in queue.
|
// Take action based on the time spent by the event in queue.
|
||||||
takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log);
|
takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log);
|
||||||
}
|
}
|
||||||
|
|
@ -283,7 +305,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
|
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
|
||||||
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
|
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
|
||||||
wasReset = false;
|
wasReset = false;
|
||||||
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
|
while (!publisherSession.offerRecords(recordsRetrieved, idleMillisBetweenCalls)) {
|
||||||
//
|
//
|
||||||
// Unlocking the read lock, and then reacquiring the read lock, should allow any waiters on the write lock a
|
// Unlocking the read lock, and then reacquiring the read lock, should allow any waiters on the write lock a
|
||||||
// chance to run. If the write lock is acquired by restartFrom than the readLock will now block until
|
// chance to run. If the write lock is acquired by restartFrom than the readLock will now block until
|
||||||
|
|
@ -296,43 +318,28 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
throw new PositionResetException();
|
throw new PositionResetException();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
prefetchCounters.added(recordsRetrieved.processRecordsInput);
|
publisherSession.prefetchCounters().added(recordsRetrieved.processRecordsInput);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Method that will be called by the 'publisher thread' and the 'demand notifying thread',
|
|
||||||
* to drain the events if the 'event notifying thread' do not have the control.
|
|
||||||
*/
|
|
||||||
private synchronized void drainQueueForRequestsIfAllowed() {
|
|
||||||
if (!shouldDrainEventOnlyOnAck.get()) {
|
|
||||||
drainQueueForRequests();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method to drain the queue based on the demand and the events availability in the queue.
|
* Method to drain the queue based on the demand and the events availability in the queue.
|
||||||
*/
|
*/
|
||||||
private synchronized void drainQueueForRequests() {
|
@VisibleForTesting
|
||||||
final RecordsRetrieved recordsToDeliver = peekNextResult();
|
synchronized void drainQueueForRequests() {
|
||||||
|
final PrefetchRecordsRetrieved recordsToDeliver = peekNextResult();
|
||||||
// If there is an event available to drain and if there is at least one demand,
|
// If there is an event available to drain and if there is at least one demand,
|
||||||
// then schedule it for delivery
|
// then schedule it for delivery
|
||||||
if (requestedResponses.get() > 0 && recordsToDeliver != null) {
|
if (publisherSession.hasDemandToPublish() && canDispatchRecord(recordsToDeliver)) {
|
||||||
|
subscriber.onNext(recordsToDeliver.prepareForPublish());
|
||||||
|
recordsToDeliver.dispatched();
|
||||||
lastEventDeliveryTime = Instant.now();
|
lastEventDeliveryTime = Instant.now();
|
||||||
subscriber.onNext(recordsToDeliver);
|
|
||||||
if (!shouldDrainEventOnlyOnAck.get()) {
|
|
||||||
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
|
|
||||||
getRecordsResultQueue.size(), requestedResponses.get());
|
|
||||||
shouldDrainEventOnlyOnAck.set(true);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier
|
|
||||||
// thread.
|
|
||||||
if (shouldDrainEventOnlyOnAck.get()) {
|
|
||||||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
|
|
||||||
shardId, getRecordsResultQueue.size(), requestedResponses.get());
|
|
||||||
shouldDrainEventOnlyOnAck.set(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This method is thread-safe and informs the caller on whether this record is eligible to be dispatched.
|
||||||
|
// If this record was already dispatched earlier, then this method would return false.
|
||||||
|
private static boolean canDispatchRecord(PrefetchRecordsRetrieved recordsToDeliver) {
|
||||||
|
return recordsToDeliver != null && !recordsToDeliver.isDispatched();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Accessors(fluent = true)
|
@Accessors(fluent = true)
|
||||||
|
|
@ -343,6 +350,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
final String lastBatchSequenceNumber;
|
final String lastBatchSequenceNumber;
|
||||||
final String shardIterator;
|
final String shardIterator;
|
||||||
final BatchUniqueIdentifier batchUniqueIdentifier;
|
final BatchUniqueIdentifier batchUniqueIdentifier;
|
||||||
|
@Accessors() @Setter(AccessLevel.NONE) boolean dispatched = false;
|
||||||
|
|
||||||
PrefetchRecordsRetrieved prepareForPublish() {
|
PrefetchRecordsRetrieved prepareForPublish() {
|
||||||
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(),
|
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(),
|
||||||
|
|
@ -354,6 +362,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
return batchUniqueIdentifier;
|
return batchUniqueIdentifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Indicates if this record batch was already dispatched for delivery.
|
||||||
|
void dispatched() { dispatched = true; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty.
|
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty.
|
||||||
* @return BatchUniqueIdentifier
|
* @return BatchUniqueIdentifier
|
||||||
|
|
@ -362,10 +373,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
return new BatchUniqueIdentifier(UUID.randomUUID().toString(),
|
return new BatchUniqueIdentifier(UUID.randomUUID().toString(),
|
||||||
StringUtils.EMPTY);
|
StringUtils.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
|
private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
|
||||||
String result = this.highestSequenceNumber;
|
String result = publisherSession.highestSequenceNumber();
|
||||||
if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) {
|
if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) {
|
||||||
result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber();
|
result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber();
|
||||||
}
|
}
|
||||||
|
|
@ -404,7 +416,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
|
|
||||||
private void makeRetrievalAttempt() {
|
private void makeRetrievalAttempt() {
|
||||||
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
|
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
|
||||||
if (prefetchCounters.shouldGetNewRecords()) {
|
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
|
||||||
try {
|
try {
|
||||||
sleepBeforeNextCall();
|
sleepBeforeNextCall();
|
||||||
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
||||||
|
|
@ -419,13 +431,12 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
.isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached())
|
.isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput);
|
|
||||||
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
|
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
|
||||||
highestSequenceNumber, getRecordsResult.nextShardIterator(),
|
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
|
||||||
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
|
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
|
||||||
highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber;
|
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
|
||||||
addArrivedRecordsInput(recordsRetrieved);
|
addArrivedRecordsInput(recordsRetrieved);
|
||||||
drainQueueForRequestsIfAllowed();
|
drainQueueForRequests();
|
||||||
} catch (PositionResetException pse) {
|
} catch (PositionResetException pse) {
|
||||||
throw pse;
|
throw pse;
|
||||||
} catch (RetryableRetrievalException rre) {
|
} catch (RetryableRetrievalException rre) {
|
||||||
|
|
@ -438,7 +449,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
|
|
||||||
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||||
|
|
||||||
dataFetcher.restartIterator();
|
publisherSession.dataFetcher().restartIterator();
|
||||||
} catch (SdkException e) {
|
} catch (SdkException e) {
|
||||||
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
|
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
|
@ -454,7 +465,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
// Consumer isn't ready to receive new records will allow prefetch counters to pause
|
// Consumer isn't ready to receive new records will allow prefetch counters to pause
|
||||||
//
|
//
|
||||||
try {
|
try {
|
||||||
prefetchCounters.waitForConsumer();
|
publisherSession.prefetchCounters().waitForConsumer();
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
log.info("{} : Thread was interrupted while waiting for the consumer. " +
|
log.info("{} : Thread was interrupted while waiting for the consumer. " +
|
||||||
"Shutdown has probably been started", shardId);
|
"Shutdown has probably been started", shardId);
|
||||||
|
|
@ -523,7 +534,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size,
|
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", publisherSession.prefetchRecordsQueue().size(), size,
|
||||||
byteSize);
|
byteSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,7 @@ import software.amazon.kinesis.metrics.MetricsFactory;
|
||||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||||
import software.amazon.kinesis.retrieval.DataFetcherResult;
|
import software.amazon.kinesis.retrieval.DataFetcherResult;
|
||||||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
|
||||||
|
import software.amazon.kinesis.retrieval.RecordsRetrieved;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -122,14 +123,14 @@ public class PrefetchRecordsPublisherIntegrationTest {
|
||||||
getRecordsCache.start(extendedSequenceNumber, initialPosition);
|
getRecordsCache.start(extendedSequenceNumber, initialPosition);
|
||||||
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
||||||
|
|
||||||
ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L)
|
ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L)
|
||||||
.processRecordsInput();
|
.processRecordsInput();
|
||||||
|
|
||||||
assertTrue(processRecordsInput1.records().isEmpty());
|
assertTrue(processRecordsInput1.records().isEmpty());
|
||||||
assertEquals(processRecordsInput1.millisBehindLatest(), new Long(1000));
|
assertEquals(processRecordsInput1.millisBehindLatest(), new Long(1000));
|
||||||
assertNotNull(processRecordsInput1.cacheEntryTime());
|
assertNotNull(processRecordsInput1.cacheEntryTime());
|
||||||
|
|
||||||
ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L)
|
ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L)
|
||||||
.processRecordsInput();
|
.processRecordsInput();
|
||||||
|
|
||||||
assertNotEquals(processRecordsInput1, processRecordsInput2);
|
assertNotEquals(processRecordsInput1, processRecordsInput2);
|
||||||
|
|
@ -140,11 +141,11 @@ public class PrefetchRecordsPublisherIntegrationTest {
|
||||||
getRecordsCache.start(extendedSequenceNumber, initialPosition);
|
getRecordsCache.start(extendedSequenceNumber, initialPosition);
|
||||||
sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS);
|
sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS);
|
||||||
|
|
||||||
assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE);
|
assertEquals(getRecordsCache.getPublisherSession().prefetchRecordsQueue().size(), MAX_SIZE);
|
||||||
|
|
||||||
ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L)
|
ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L)
|
||||||
.processRecordsInput();
|
.processRecordsInput();
|
||||||
ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L)
|
ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L)
|
||||||
.processRecordsInput();
|
.processRecordsInput();
|
||||||
|
|
||||||
assertNotEquals(processRecordsInput1, processRecordsInput2);
|
assertNotEquals(processRecordsInput1, processRecordsInput2);
|
||||||
|
|
@ -184,9 +185,9 @@ public class PrefetchRecordsPublisherIntegrationTest {
|
||||||
|
|
||||||
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
||||||
|
|
||||||
ProcessRecordsInput p1 = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput();
|
ProcessRecordsInput p1 = evictPublishedEvent(getRecordsCache, shardId).processRecordsInput();
|
||||||
|
|
||||||
ProcessRecordsInput p2 = recordsPublisher2.pollNextResultAndUpdatePrefetchCounters().processRecordsInput();
|
ProcessRecordsInput p2 = evictPublishedEvent(recordsPublisher2, shardId).processRecordsInput();
|
||||||
|
|
||||||
assertNotEquals(p1, p2);
|
assertNotEquals(p1, p2);
|
||||||
assertTrue(p1.records().isEmpty());
|
assertTrue(p1.records().isEmpty());
|
||||||
|
|
@ -212,7 +213,7 @@ public class PrefetchRecordsPublisherIntegrationTest {
|
||||||
getRecordsCache.start(extendedSequenceNumber, initialPosition);
|
getRecordsCache.start(extendedSequenceNumber, initialPosition);
|
||||||
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
sleep(IDLE_MILLIS_BETWEEN_CALLS);
|
||||||
|
|
||||||
ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L)
|
ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L)
|
||||||
.processRecordsInput();
|
.processRecordsInput();
|
||||||
|
|
||||||
assertNotNull(processRecordsInput);
|
assertNotNull(processRecordsInput);
|
||||||
|
|
@ -220,6 +221,10 @@ public class PrefetchRecordsPublisherIntegrationTest {
|
||||||
verify(dataFetcher).restartIterator();
|
verify(dataFetcher).restartIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) {
|
||||||
|
return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId);
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
getRecordsCache.shutdown();
|
getRecordsCache.shutdown();
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ package software.amazon.kinesis.retrieval.polling;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
@ -53,7 +54,7 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import io.reactivex.plugins.RxJavaPlugins;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -131,7 +132,7 @@ public class PrefetchRecordsPublisherTest {
|
||||||
new NullMetricsFactory(),
|
new NullMetricsFactory(),
|
||||||
operation,
|
operation,
|
||||||
"shardId");
|
"shardId");
|
||||||
spyQueue = spy(getRecordsCache.getRecordsResultQueue);
|
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
|
||||||
records = spy(new ArrayList<>());
|
records = spy(new ArrayList<>());
|
||||||
getRecordsResponse = GetRecordsResponse.builder().records(records).build();
|
getRecordsResponse = GetRecordsResponse.builder().records(records).build();
|
||||||
|
|
||||||
|
|
@ -148,7 +149,7 @@ public class PrefetchRecordsPublisherTest {
|
||||||
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
|
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
|
||||||
|
|
||||||
getRecordsCache.start(sequenceNumber, initialPosition);
|
getRecordsCache.start(sequenceNumber, initialPosition);
|
||||||
ProcessRecordsInput result = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L)
|
ProcessRecordsInput result = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L)
|
||||||
.processRecordsInput();
|
.processRecordsInput();
|
||||||
|
|
||||||
assertEquals(expectedRecords, result.records());
|
assertEquals(expectedRecords, result.records());
|
||||||
|
|
@ -218,7 +219,7 @@ public class PrefetchRecordsPublisherTest {
|
||||||
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
|
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
|
||||||
|
|
||||||
getRecordsCache.start(sequenceNumber, initialPosition);
|
getRecordsCache.start(sequenceNumber, initialPosition);
|
||||||
ProcessRecordsInput processRecordsInput = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput();
|
ProcessRecordsInput processRecordsInput = evictPublishedEvent(getRecordsCache, "shardId").processRecordsInput();
|
||||||
|
|
||||||
verify(executorService).execute(any());
|
verify(executorService).execute(any());
|
||||||
assertEquals(expectedRecords, processRecordsInput.records());
|
assertEquals(expectedRecords, processRecordsInput.records());
|
||||||
|
|
@ -227,7 +228,7 @@ public class PrefetchRecordsPublisherTest {
|
||||||
|
|
||||||
sleep(2000);
|
sleep(2000);
|
||||||
|
|
||||||
ProcessRecordsInput processRecordsInput2 = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput();
|
ProcessRecordsInput processRecordsInput2 = evictPublishedEvent(getRecordsCache, "shardId").processRecordsInput();
|
||||||
assertNotEquals(processRecordsInput, processRecordsInput2);
|
assertNotEquals(processRecordsInput, processRecordsInput2);
|
||||||
assertEquals(expectedRecords, processRecordsInput2.records());
|
assertEquals(expectedRecords, processRecordsInput2.records());
|
||||||
assertNotEquals(processRecordsInput2.timeSpentInCache(), Duration.ZERO);
|
assertNotEquals(processRecordsInput2.timeSpentInCache(), Duration.ZERO);
|
||||||
|
|
@ -238,13 +239,13 @@ public class PrefetchRecordsPublisherTest {
|
||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
public void testGetNextRecordsWithoutStarting() {
|
public void testGetNextRecordsWithoutStarting() {
|
||||||
verify(executorService, times(0)).execute(any());
|
verify(executorService, times(0)).execute(any());
|
||||||
getRecordsCache.pollNextResultAndUpdatePrefetchCounters();
|
getRecordsCache.drainQueueForRequests();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
public void testCallAfterShutdown() {
|
public void testCallAfterShutdown() {
|
||||||
when(executorService.isShutdown()).thenReturn(true);
|
when(executorService.isShutdown()).thenReturn(true);
|
||||||
getRecordsCache.pollNextResultAndUpdatePrefetchCounters();
|
getRecordsCache.drainQueueForRequests();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -257,7 +258,7 @@ public class PrefetchRecordsPublisherTest {
|
||||||
|
|
||||||
doNothing().when(dataFetcher).restartIterator();
|
doNothing().when(dataFetcher).restartIterator();
|
||||||
|
|
||||||
blockUntilRecordsAvailable(() -> getRecordsCache.pollNextResultAndUpdatePrefetchCounters(), 1000L);
|
blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L);
|
||||||
|
|
||||||
sleep(1000);
|
sleep(1000);
|
||||||
|
|
||||||
|
|
@ -272,11 +273,11 @@ public class PrefetchRecordsPublisherTest {
|
||||||
|
|
||||||
getRecordsCache.start(sequenceNumber, initialPosition);
|
getRecordsCache.start(sequenceNumber, initialPosition);
|
||||||
|
|
||||||
RecordsRetrieved records = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000);
|
RecordsRetrieved records = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
|
||||||
assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest()));
|
assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 20000L)
|
@Test(timeout = 10000L)
|
||||||
public void testNoDeadlockOnFullQueue() {
|
public void testNoDeadlockOnFullQueue() {
|
||||||
//
|
//
|
||||||
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448
|
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448
|
||||||
|
|
@ -285,12 +286,12 @@ public class PrefetchRecordsPublisherTest {
|
||||||
// If the test times out before starting the subscriber it means something went wrong while filling the queue.
|
// If the test times out before starting the subscriber it means something went wrong while filling the queue.
|
||||||
// After the subscriber is started one of the things that can trigger a timeout is a deadlock.
|
// After the subscriber is started one of the things that can trigger a timeout is a deadlock.
|
||||||
//
|
//
|
||||||
GetRecordsResponse response = GetRecordsResponse.builder().records(
|
|
||||||
Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build())
|
|
||||||
.build();
|
|
||||||
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);
|
|
||||||
|
|
||||||
RxJavaPlugins.setErrorHandler(e -> e.printStackTrace());
|
final int[] sequenceNumberInResponse = { 0 };
|
||||||
|
|
||||||
|
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenAnswer( i -> GetRecordsResponse.builder().records(
|
||||||
|
Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber(++sequenceNumberInResponse[0] + "").build())
|
||||||
|
.build());
|
||||||
|
|
||||||
getRecordsCache.start(sequenceNumber, initialPosition);
|
getRecordsCache.start(sequenceNumber, initialPosition);
|
||||||
|
|
||||||
|
|
@ -298,18 +299,23 @@ public class PrefetchRecordsPublisherTest {
|
||||||
// Wait for the queue to fill up, and the publisher to block on adding items to the queue.
|
// Wait for the queue to fill up, and the publisher to block on adding items to the queue.
|
||||||
//
|
//
|
||||||
log.info("Waiting for queue to fill up");
|
log.info("Waiting for queue to fill up");
|
||||||
while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) {
|
while (getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() < MAX_SIZE) {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
|
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getPublisherSession().prefetchRecordsQueue().size());
|
||||||
AtomicInteger receivedItems = new AtomicInteger(0);
|
AtomicInteger receivedItems = new AtomicInteger(0);
|
||||||
|
|
||||||
final int expectedItems = MAX_SIZE * 10;
|
final int expectedItems = MAX_SIZE * 10;
|
||||||
|
|
||||||
Object lock = new Object();
|
Object lock = new Object();
|
||||||
|
|
||||||
|
final boolean[] isRecordNotInorder = { false };
|
||||||
|
final String[] recordNotInOrderMessage = { "" };
|
||||||
|
|
||||||
Subscriber<RecordsRetrieved> delegateSubscriber = new Subscriber<RecordsRetrieved>() {
|
Subscriber<RecordsRetrieved> delegateSubscriber = new Subscriber<RecordsRetrieved>() {
|
||||||
Subscription sub;
|
Subscription sub;
|
||||||
|
int receivedSeqNum = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSubscribe(Subscription s) {
|
public void onSubscribe(Subscription s) {
|
||||||
|
|
@ -320,6 +326,13 @@ public class PrefetchRecordsPublisherTest {
|
||||||
@Override
|
@Override
|
||||||
public void onNext(RecordsRetrieved recordsRetrieved) {
|
public void onNext(RecordsRetrieved recordsRetrieved) {
|
||||||
receivedItems.incrementAndGet();
|
receivedItems.incrementAndGet();
|
||||||
|
if (Integer.parseInt(((PrefetchRecordsPublisher.PrefetchRecordsRetrieved) recordsRetrieved)
|
||||||
|
.lastBatchSequenceNumber()) != ++receivedSeqNum) {
|
||||||
|
isRecordNotInorder[0] = true;
|
||||||
|
recordNotInOrderMessage[0] = "Expected : " + receivedSeqNum + " Actual : "
|
||||||
|
+ ((PrefetchRecordsPublisher.PrefetchRecordsRetrieved) recordsRetrieved)
|
||||||
|
.lastBatchSequenceNumber();
|
||||||
|
}
|
||||||
if (receivedItems.get() >= expectedItems) {
|
if (receivedItems.get() >= expectedItems) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
log.info("Notifying waiters");
|
log.info("Notifying waiters");
|
||||||
|
|
@ -357,9 +370,10 @@ public class PrefetchRecordsPublisherTest {
|
||||||
}
|
}
|
||||||
verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt());
|
verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt());
|
||||||
assertThat(receivedItems.get(), equalTo(expectedItems));
|
assertThat(receivedItems.get(), equalTo(expectedItems));
|
||||||
|
assertFalse(recordNotInOrderMessage[0], isRecordNotInorder[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 20000L)
|
@Test(timeout = 10000L)
|
||||||
public void testNoDeadlockOnFullQueueAndLossOfNotification() {
|
public void testNoDeadlockOnFullQueueAndLossOfNotification() {
|
||||||
//
|
//
|
||||||
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/602
|
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/602
|
||||||
|
|
@ -377,12 +391,13 @@ public class PrefetchRecordsPublisherTest {
|
||||||
// Wait for the queue to fill up, and the publisher to block on adding items to the queue.
|
// Wait for the queue to fill up, and the publisher to block on adding items to the queue.
|
||||||
//
|
//
|
||||||
log.info("Waiting for queue to fill up");
|
log.info("Waiting for queue to fill up");
|
||||||
while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) {
|
while (getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() < MAX_SIZE) {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
|
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getPublisherSession().prefetchRecordsQueue().size());
|
||||||
AtomicInteger receivedItems = new AtomicInteger(0);
|
AtomicInteger receivedItems = new AtomicInteger(0);
|
||||||
|
|
||||||
final int expectedItems = MAX_SIZE * 20;
|
final int expectedItems = MAX_SIZE * 20;
|
||||||
|
|
||||||
Object lock = new Object();
|
Object lock = new Object();
|
||||||
|
|
@ -459,23 +474,23 @@ public class PrefetchRecordsPublisherTest {
|
||||||
|
|
||||||
getRecordsCache.start(sequenceNumber, initialPosition);
|
getRecordsCache.start(sequenceNumber, initialPosition);
|
||||||
|
|
||||||
RecordsRetrieved lastProcessed = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000);
|
RecordsRetrieved lastProcessed = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
|
||||||
RecordsRetrieved expected = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000);
|
RecordsRetrieved expected = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
|
||||||
|
|
||||||
//
|
//
|
||||||
// Skip some of the records the cache
|
// Skip some of the records the cache
|
||||||
//
|
//
|
||||||
blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000);
|
blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
|
||||||
blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000);
|
blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
|
||||||
|
|
||||||
verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt());
|
verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt());
|
||||||
|
|
||||||
while(getRecordsCache.getRecordsResultQueue.remainingCapacity() > 0) {
|
while(getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
|
|
||||||
getRecordsCache.restartFrom(lastProcessed);
|
getRecordsCache.restartFrom(lastProcessed);
|
||||||
RecordsRetrieved postRestart = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000);
|
RecordsRetrieved postRestart = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
|
||||||
|
|
||||||
assertThat(postRestart.processRecordsInput(), eqProcessRecordsInput(expected.processRecordsInput()));
|
assertThat(postRestart.processRecordsInput(), eqProcessRecordsInput(expected.processRecordsInput()));
|
||||||
verify(dataFetcher).resetIterator(eq(responses.get(0).nextShardIterator()),
|
verify(dataFetcher).resetIterator(eq(responses.get(0).nextShardIterator()),
|
||||||
|
|
@ -483,6 +498,10 @@ public class PrefetchRecordsPublisherTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) {
|
||||||
|
return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId);
|
||||||
|
}
|
||||||
|
|
||||||
private static class RetrieverAnswer implements Answer<GetRecordsResponse> {
|
private static class RetrieverAnswer implements Answer<GetRecordsResponse> {
|
||||||
|
|
||||||
private final List<GetRecordsResponse> responses;
|
private final List<GetRecordsResponse> responses;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue