Drain delivery queue to make slow consumers consume events at their pace (#607)

* Allowing consumers to drain the delivery queue on subscription end

* Test cases fix

* Added test cases

* Made feedback changes
This commit is contained in:
ashwing 2019-09-13 17:01:04 -07:00 committed by Micah Jaffe
parent db94cb60ef
commit a94dc7d61d
3 changed files with 438 additions and 65 deletions

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.retrieval.fanout;
import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
@ -32,6 +33,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
@ -63,8 +65,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(
ThrowableType.ACQUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10;
private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000;
// Max burst of 10 payload events + 1 terminal event (onError/onComplete) from the service.
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10 + 1;
private final KinesisAsyncClient kinesis;
private final String shardId;
@ -82,9 +84,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private Subscriber<? super RecordsRetrieved> subscriber;
private long availableQueueSpace = 0;
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE);
// Flag to indicate if the active subscription is being torn down.
private boolean pendingActiveSubscriptionShutdown = false;
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
MAX_EVENT_BURST_FROM_SERVICE);
@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
@ -135,13 +136,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck);
} catch (Throwable t) {
errorOccurred(triggeringFlow, t);
} finally {
// Notify all the actors who are waiting for the records ack event.
// Here, when the active subscription is being torn down, the completing thread will
// wait for the last delivered records to send back the ack, to prevent sending duplicate records.
if(pendingActiveSubscriptionShutdown) {
lockObject.notifyAll();
}
}
if (triggeringFlow != null) {
updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow);
@ -158,20 +152,23 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// RecordFlow of the current event that needs to be returned
RecordFlow flowToBeReturned = null;
final RecordsRetrieved recordsRetrieved = recordsRetrievedContext != null ?
recordsRetrievedContext.getRecordsRetrieved() : null;
// Check if the ack corresponds to the head of the delivery queue.
if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier()
if (recordsRetrieved != null && recordsRetrieved.batchUniqueIdentifier()
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
// It is now safe to remove the element
recordsDeliveryQueue.poll();
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber();
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue, if available.
// Try scheduling the next event in the queue or execute the subscription shutdown action.
if (!recordsDeliveryQueue.isEmpty()) {
scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved());
recordsDeliveryQueue.peek().executeEventAction(subscriber);
}
} else {
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a
@ -197,14 +194,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@VisibleForTesting
void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) {
final RecordsRetrievedContext recordsRetrievedContext =
new RecordsRetrievedContext(recordsRetrieved, triggeringFlow, Instant.now());
new RecordsRetrievedContext(Either.left(recordsRetrieved), triggeringFlow, Instant.now());
try {
// Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure.
// Note: This does not block wait to enqueue.
recordsDeliveryQueue.add(recordsRetrievedContext);
// If the current batch is the only element in the queue, then try scheduling the event delivery.
if (recordsDeliveryQueue.size() == 1) {
scheduleNextEvent(recordsRetrieved);
subscriber.onNext(recordsRetrieved);
}
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ",
@ -216,19 +213,40 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
// Schedule the next event only when the active subscription is not pending shutdown.
private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) {
if (!pendingActiveSubscriptionShutdown) {
subscriber.onNext(recordsRetrieved);
@Data
private static final class RecordsRetrievedContext {
@Getter(AccessLevel.NONE)
private final Either<RecordsRetrieved, SubscriptionShutdownEvent> recordsOrShutdownEvent;
private final RecordFlow recordFlow;
private final Instant enqueueTimestamp;
RecordsRetrieved getRecordsRetrieved() {
return recordsOrShutdownEvent.map(recordsEvent -> recordsEvent, shutdownEvent -> null);
}
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber) {
recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent),
shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run());
}
}
@Data
private static final class RecordsRetrievedContext {
private final RecordsRetrieved recordsRetrieved;
private final RecordFlow recordFlow;
private final Instant enqueueTimestamp;
@Getter
private static final class SubscriptionShutdownEvent {
private final Runnable subscriptionShutdownAction;
private final String eventIdentifier;
private final Throwable shutdownEventThrowableOptional;
SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier, Throwable shutdownEventThrowableOptional) {
this.subscriptionShutdownAction = subscriptionShutdownAction;
this.eventIdentifier = eventIdentifier;
this.shutdownEventThrowableOptional = shutdownEventThrowableOptional;
}
SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier) {
this(subscriptionShutdownAction, eventIdentifier, null);
}
}
private boolean hasValidSubscriber() {
@ -280,9 +298,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
return;
}
// Clear the delivery buffer so that next subscription don't yield duplicate records.
resetRecordsDeliveryStateOnSubscriptionShutdown();
Throwable propagationThrowable = t;
ThrowableCategory category = throwableCategory(propagationThrowable);
@ -339,31 +354,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
+ "previous subscription - {} ", shardId, subscribeToShardId);
recordsDeliveryQueue.clear();
}
if(pendingActiveSubscriptionShutdown) {
log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of"
+ "previous subscription - {} ", shardId, subscribeToShardId);
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject
private void resetRecordsDeliveryStateOnSubscriptionShutdown() {
// Wait for final event notification during the end of the subscription.
if (!recordsDeliveryQueue.isEmpty()) {
// This will prevent further events from getting scheduled, during the wait period.
pendingActiveSubscriptionShutdown = true;
try {
// Wait for the configured time to get a notification for already delivered event, if any.
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Clear the queue to remove any remaining entries from the queue.
recordsDeliveryQueue.clear();
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}
protected void logAcquireTimeoutMessage(Throwable t) {
@ -490,13 +480,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
private boolean shouldShutdownSubscriptionNow() {
return recordsDeliveryQueue.isEmpty();
}
private void onComplete(RecordFlow triggeringFlow) {
synchronized (lockObject) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
resetRecordsDeliveryStateOnSubscriptionShutdown();
triggeringFlow.cancel();
if (!hasValidSubscriber()) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
@ -512,7 +504,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
if (currentSequenceNumber != null) {
log.debug("{}: Shard hasn't ended resubscribing.", shardId);
log.debug("{}: Shard hasn't ended. Resubscribing.", shardId);
subscribeToShard(currentSequenceNumber);
} else {
log.debug("{}: Shard has ended completing subscriber.", shardId);
@ -732,6 +724,18 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override
public void exceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {
if (parent.shouldShutdownSubscriptionNow()) {
executeExceptionOccurred(throwable);
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeExceptionOccurred(throwable), "onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
}
private void executeExceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
@ -759,6 +763,32 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
@Override
public void complete() {
synchronized (parent.lockObject) {
if (parent.shouldShutdownSubscriptionNow()) {
executeComplete();
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeComplete(), "onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
}
// This method is not thread safe. This needs to be executed after acquiring lock on parent.lockObject
private void tryEnqueueSubscriptionShutdownEvent(SubscriptionShutdownEvent subscriptionShutdownEvent) {
try {
parent.recordsDeliveryQueue
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) {
log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(),
parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
}
private void executeComplete() {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed",
parent.shardId, connectionStartedAt, subscribeToShardId);

View file

@ -8,6 +8,7 @@ import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.SafeSubscriber;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
@ -65,6 +66,7 @@ import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -405,6 +407,312 @@ public class FanOutRecordsPublisherTest {
}
@Test
public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
Consumer<Integer> servicePublisherAction = contSeqNum -> captor.getValue().onNext(
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum + "")
.records(records)
.build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
int triggerCompleteAtNthEvent = 200;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
initialDemand);
servicePublisher.setCompleteTrigger(triggerCompleteAtNthEvent, () -> flowCaptor.getValue().complete());
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
private Subscription subscription;
private int lastSeenSeqNum = 0;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
servicePublisher.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if(receivedInput.size() == triggerCompleteAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
}, source);
ExecutorService executorService = getTestExecutor();
Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService)));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(shardConsumerSubscriber);
verify(kinesisClient, times(1)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
executorService.submit(servicePublisher);
servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(receivedInput.size(), equalTo(triggerCompleteAtNthEvent));
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerCompleteAtNthEvent + ""));
// In non-shard end cases, upon successful completion, the publisher would re-subscribe to service.
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
}
@Test
public void testIfShardEndEventAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
Consumer<Integer> servicePublisherAction = contSeqNum -> captor.getValue().onNext(
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum + "")
.records(records)
.build());
Consumer<Integer> servicePublisherShardEndAction = contSeqNum -> captor.getValue().onNext(
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(null)
.records(records)
.build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
int triggerCompleteAtNthEvent = 200;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
initialDemand);
servicePublisher
.setShardEndAndCompleteTrigger(triggerCompleteAtNthEvent, () -> flowCaptor.getValue().complete(),
servicePublisherShardEndAction);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
final boolean[] isOnCompleteTriggered = { false };
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
private Subscription subscription;
private int lastSeenSeqNum = 0;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
servicePublisher.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
subscription.request(1);
servicePublisher.request(1);
if(receivedInput.size() == triggerCompleteAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override public void onComplete() {
isOnCompleteTriggered[0] = true;
}
}, source);
ExecutorService executorService = getTestExecutor();
Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService)));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(shardConsumerSubscriber);
verify(kinesisClient, times(1)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
executorService.submit(servicePublisher);
servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(receivedInput.size(), equalTo(triggerCompleteAtNthEvent));
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
assertNull(source.getCurrentSequenceNumber());
// With shard end event, onComplete must be propagated to the subscriber.
assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]);
}
@Test
public void testIfStreamOfEventsAndOnErrorAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
Consumer<Integer> servicePublisherAction = contSeqNum -> captor.getValue().onNext(
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum + "")
.records(records)
.build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
int triggerErrorAtNthEvent = 241;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
initialDemand);
servicePublisher.setErrorTrigger(triggerErrorAtNthEvent,
() -> flowCaptor.getValue().exceptionOccurred(new RuntimeException("Service Exception")));
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
final boolean[] isOnErrorThrown = { false };
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
private Subscription subscription;
private int lastSeenSeqNum = 0;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
servicePublisher.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if(receivedInput.size() == triggerErrorAtNthEvent) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
isOnErrorThrown[0] = true;
}
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
}, source);
ExecutorService executorService = getTestExecutor();
Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService)));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(shardConsumerSubscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
executorService.submit(servicePublisher);
servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(receivedInput.size(), equalTo(triggerErrorAtNthEvent));
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + ""));
assertTrue("OnError should have been thrown", isOnErrorThrown[0]);
}
@Test
public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstWithinLimit() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
@ -517,7 +825,7 @@ public class FanOutRecordsPublisherTest {
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(1);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
int initialDemand = 11;
BackpressureAdheringServicePublisher servicePublisher =
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
@ -844,7 +1152,7 @@ public class FanOutRecordsPublisherTest {
@Override public void onComplete() {}
});
try {
IntStream.rangeClosed(1, 11).forEach(
IntStream.rangeClosed(1, 12).forEach(
i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow));
fail("Should throw Queue full exception");
} catch (IllegalStateException e) {
@ -1131,10 +1439,17 @@ public class FanOutRecordsPublisherTest {
private final Integer numOfTimes;
private final CountDownLatch taskCompletionLatch;
private final Semaphore demandNotifier;
private Integer sendCompletionAt;
private Runnable completeAction;
private Integer sendErrorAt;
private Runnable errorAction;
private Consumer<Integer> shardEndAction;
BackpressureAdheringServicePublisher(Consumer<Integer> action, Integer numOfTimes,
CountDownLatch taskCompletionLatch, Integer initialDemand) {
this(action, numOfTimes, taskCompletionLatch, new Semaphore(initialDemand));
sendCompletionAt = Integer.MAX_VALUE;
sendErrorAt = Integer.MAX_VALUE;
}
public void request(int n) {
@ -1144,10 +1459,39 @@ public class FanOutRecordsPublisherTest {
public void run() {
for (int i = 1; i <= numOfTimes; ) {
demandNotifier.acquireUninterruptibly();
if(i == sendCompletionAt) {
if(shardEndAction != null) {
shardEndAction.accept(i++);
} else {
action.accept(i++);
}
completeAction.run();
break;
}
if(i == sendErrorAt) {
action.accept(i++);
errorAction.run();
break;
}
action.accept(i++);
}
taskCompletionLatch.countDown();
}
public void setCompleteTrigger(Integer sendCompletionAt, Runnable completeAction) {
this.sendCompletionAt = sendCompletionAt;
this.completeAction = completeAction;
}
public void setShardEndAndCompleteTrigger(Integer sendCompletionAt, Runnable completeAction, Consumer<Integer> shardEndAction) {
setCompleteTrigger(sendCompletionAt, completeAction);
this.shardEndAction = shardEndAction;
}
public void setErrorTrigger(Integer sendErrorAt, Runnable errorAction) {
this.sendErrorAt = sendErrorAt;
this.errorAction = errorAction;
}
}
private Record makeRecord(String sequenceNumber) {

View file

@ -304,7 +304,7 @@ public class PrefetchRecordsPublisherTest {
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 1000;
final int expectedItems = MAX_SIZE * 10;
Object lock = new Object();
@ -383,7 +383,7 @@ public class PrefetchRecordsPublisherTest {
log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 100;
final int expectedItems = MAX_SIZE * 20;
Object lock = new Object();
@ -521,7 +521,7 @@ public class PrefetchRecordsPublisherTest {
private static class LossyNotificationSubscriber extends ShardConsumerNotifyingSubscriber {
private static final int LOSS_EVERY_NTH_RECORD = 100;
private static final int LOSS_EVERY_NTH_RECORD = 50;
private static int recordCounter = 0;
private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1);
@ -531,7 +531,6 @@ public class PrefetchRecordsPublisherTest {
@Override
public void onNext(RecordsRetrieved recordsRetrieved) {
log.info("Subscriber received onNext");
if (!(recordCounter % LOSS_EVERY_NTH_RECORD == LOSS_EVERY_NTH_RECORD - 1)) {
getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved));
getDelegateSubscriber().onNext(recordsRetrieved);