Limited threads resiliency fix durability nonblock (#573)

* Adding unit test case for record delivery validation

* Initial prototype for notification mechanism between ShardConsumerSubscriber and FanoutPublisher. The SDK Threads are made to block wait on the ack from the ShardConsumerSubscriber

* initial non blocking prototype

* Refactoring src and test

* Added unit test cases. Addressed review comments. Handled edge cases

* Minor code changes. Note that the previous commit has blocking impl of PrefetchPublisher

* Refactored the cleanup logic

* Fix for Cloudwatch exception handling and other revioew comment fixes

* Typo fix

* Removing cloudwatch fix. Will be released in a separate commit.

* Changing RejectedTaskEvent log message for the release

* Added javadoc to RecordsDeliveryAck and optimized imports

* Adding Kinesis Internal API tag for new concrete implementations
This commit is contained in:
ashwing 2019-08-16 14:24:19 -07:00 committed by Micah Jaffe
parent c2a3f18670
commit 3f6afc6563
15 changed files with 1158 additions and 85 deletions

View file

@ -0,0 +1,50 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import org.slf4j.Logger;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import java.time.Duration;
import java.time.Instant;
import static software.amazon.kinesis.lifecycle.ShardConsumer.MAX_TIME_BETWEEN_REQUEST_RESPONSE;
@KinesisClientInternalApi
public class DiagnosticUtils {
/**
* Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action.
* @param shardId of the shard that is having delayed delivery
* @param enqueueTimestamp of the event submitted to the executor service
* @param log Slf4j Logger from RecordPublisher to log the events
*/
public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) {
final long durationBetweenEnqueueAndAckInMillis = Duration
.between(enqueueTimestamp, Instant.now()).toMillis();
if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) {
// The above condition logs the warn msg if the delivery time exceeds 11 seconds.
log.warn(
"{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs"
+ " to see the state of the executor service. Also check if the RecordProcessor's processing "
+ "time is high. ",
shardId, durationBetweenEnqueueAndAckInMillis);
} else if (log.isDebugEnabled()) {
log.debug("{}: Record delivery time to shard consumer is {} millis", shardId,
durationBetweenEnqueueAndAckInMillis);
}
}
}

View file

@ -26,7 +26,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@KinesisClientInternalApi
class RejectedTaskEvent implements DiagnosticEvent {
private static final String MESSAGE = "Review your thread configuration to prevent task rejections. " +
"Until next release, KCL will not be resilient to task rejections. ";
"Task rejections will slow down your application and some shards may stop processing. ";
private ExecutorStateEvent executorStateEvent;
private Throwable throwable;

View file

@ -0,0 +1,68 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
/**
* Subscriber that notifies its publisher on receipt of the onNext event.
*/
public interface NotifyingSubscriber extends Subscriber<RecordsRetrieved> {
/**
* Return the actual subscriber to which the events needs to be delegated.
* @return Subscriber<T> to be delegated
*/
Subscriber<RecordsRetrieved> getDelegateSubscriber();
/**
* Return the publisher to be notified
* @return RecordsPublisher to be notified.
*/
RecordsPublisher getRecordsPublisher();
/**
* Construct RecordsDeliveryAck object from the incoming data and return it
* @param recordsRetrieved for which we need the ack.
* @return getRecordsDeliveryAck
*/
RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved);
@Override
default void onSubscribe(Subscription subscription) {
getDelegateSubscriber().onSubscribe(subscription);
}
@Override
default void onNext(RecordsRetrieved recordsRetrieved) {
getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved));
getDelegateSubscriber().onNext(recordsRetrieved);
}
@Override
default void onError(Throwable throwable) {
getDelegateSubscriber().onError(throwable);
}
@Override
default void onComplete() {
getDelegateSubscriber().onComplete();
}
}

View file

@ -0,0 +1,47 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import lombok.AllArgsConstructor;
import org.reactivestreams.Subscriber;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
@KinesisClientInternalApi
@AllArgsConstructor
public class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber {
private final Subscriber<RecordsRetrieved> delegate;
private final RecordsPublisher recordsPublisher;
@Override
public Subscriber<RecordsRetrieved> getDelegateSubscriber() {
return delegate;
}
@Override
public RecordsPublisher getRecordsPublisher() {
return recordsPublisher;
}
@Override
public RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved) {
return () -> recordsRetrieved.batchUniqueIdentifier();
}
}

View file

@ -82,7 +82,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
recordsPublisher.restartFrom(lastAccepted);
}
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
.subscribe(this);
.subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher));
}
}
@ -216,4 +216,5 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
subscription.cancel();
}
}
}

View file

@ -0,0 +1,26 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import lombok.Data;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@KinesisClientInternalApi
@Data
public class BatchUniqueIdentifier {
private final String recordBatchIdentifier;
private final String flowIdentifier;
}

View file

@ -0,0 +1,29 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
/**
* Interface to supply all the meta information for record delivery ack.
*/
public interface RecordsDeliveryAck {
/**
* Unique record batch identifier used to ensure the durability and ordering guarantees.
* @return id that uniquely determines a record batch and its source.
*/
BatchUniqueIdentifier batchUniqueIdentifier();
}

View file

@ -18,7 +18,6 @@ package software.amazon.kinesis.retrieval;
import org.reactivestreams.Publisher;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
@ -47,4 +46,12 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
*/
void shutdown();
/**
* Notify the publisher on receipt of a data event.
* @param ack acknowledgement received from the subscriber.
*/
default void notify(RecordsDeliveryAck ack) {
throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber");
}
}

View file

@ -24,4 +24,13 @@ public interface RecordsRetrieved {
* @return the processRecordsInput received
*/
ProcessRecordsInput processRecordsInput();
/**
* Returns the identifier that uniquely identifies this batch.
*
* @return batchUniqueIdentifier that uniquely identifies the records batch and its source.
*/
default BatchUniqueIdentifier batchUniqueIdentifier() {
throw new UnsupportedOperationException("Retrieval of batch unique identifier is not supported");
}
}

View file

@ -15,20 +15,15 @@
package software.amazon.kinesis.retrieval.fanout;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.google.common.annotations.VisibleForTesting;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
@ -41,13 +36,26 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
@RequiredArgsConstructor
@Slf4j
@KinesisClientInternalApi
@ -55,6 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(
ThrowableType.ACQUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10;
private final KinesisAsyncClient kinesis;
private final String shardId;
@ -63,9 +72,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final Object lockObject = new Object();
private final AtomicInteger subscribeToShardId = new AtomicInteger(0);
private RecordFlow flow;
@Getter @VisibleForTesting
private String currentSequenceNumber;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
private boolean isFirstConnection = true;
@ -73,6 +81,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private Subscriber<? super RecordsRetrieved> subscriber;
private long availableQueueSpace = 0;
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE);
@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
@ -114,12 +124,107 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
@Override
public void notify(RecordsDeliveryAck recordsDeliveryAck) {
synchronized (lockObject) {
RecordFlow triggeringFlow = null;
try {
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck);
} catch (Throwable t) {
errorOccurred(triggeringFlow, t);
}
if (triggeringFlow != null) {
updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow);
}
}
}
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
@VisibleForTesting
RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) {
// Peek the head of the queue on receiving the ack.
// Note : This does not block wait to retrieve an element.
final RecordsRetrievedContext recordsRetrievedContext = recordsDeliveryQueue.peek();
// RecordFlow of the current event that needs to be returned
RecordFlow flowToBeReturned = null;
// Check if the ack corresponds to the head of the delivery queue.
if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier()
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
// It is now safe to remove the element
recordsDeliveryQueue.poll();
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue, if available.
if (recordsDeliveryQueue.peek() != null) {
subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved());
}
} else {
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a
// missing event in active flow, then it means the event was already acked or cleared
// from the queue due to a potential bug.
if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()
.equals(flow.getSubscribeToShardId())) {
log.error(
"{}: Received unexpected ack for the active subscription {}. Throwing. ",
shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
throw new IllegalStateException("Unexpected ack for the active subscription");
}
// Otherwise publisher received a stale ack.
else {
log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", shardId,
recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier());
}
}
return flowToBeReturned;
}
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
@VisibleForTesting
void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) {
final RecordsRetrievedContext recordsRetrievedContext =
new RecordsRetrievedContext(recordsRetrieved, triggeringFlow, Instant.now());
try {
// Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure.
// Note: This does not block wait to enqueue.
recordsDeliveryQueue.add(recordsRetrievedContext);
// If the current batch is the only element in the queue, then try scheduling the event delivery.
if (recordsDeliveryQueue.size() == 1) {
subscriber.onNext(recordsRetrieved);
}
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ",
shardId, recordsDeliveryQueue.remainingCapacity());
throw e;
} catch (Throwable t) {
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
throw t;
}
}
@Data
private static final class RecordsRetrievedContext {
private final RecordsRetrieved recordsRetrieved;
private final RecordFlow recordFlow;
private final Instant enqueueTimestamp;
}
private boolean hasValidSubscriber() {
return subscriber != null;
}
private boolean hasValidFlow() {
return flow != null;
}
private void subscribeToShard(String sequenceNumber) {
synchronized (lockObject) {
// Clear the queue so that any stale entries from previous subscription are discarded.
clearRecordsDeliveryQueue();
SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder()
.shardId(shardId).consumerARN(consumerArn);
SubscribeToShardRequest request;
@ -143,12 +248,23 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
synchronized (lockObject) {
if (!hasValidSubscriber()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
if(hasValidFlow()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
} else {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null",
shardId);
}
return;
}
// Clear the delivery buffer so that next subscription don't yield duplicate records.
clearRecordsDeliveryQueue();
Throwable propagationThrowable = t;
ThrowableCategory category = throwableCategory(propagationThrowable);
@ -178,7 +294,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
availableQueueSpace = 0;
try {
handleFlowError(propagationThrowable);
handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
}
@ -197,6 +313,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
private void clearRecordsDeliveryQueue() {
recordsDeliveryQueue.clear();
}
protected void logAcquireTimeoutMessage(Throwable t) {
log.error("An acquire timeout occurred which usually indicates that the KinesisAsyncClient supplied has a " +
"low maximum streams limit. " +
@ -204,13 +324,16 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
"or refer to the class to setup the client manually.");
}
private void handleFlowError(Throwable t) {
private void handleFlowError(Throwable t, RecordFlow triggeringFlow) {
if (t.getCause() instanceof ResourceNotFoundException) {
log.debug(
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
shardId);
// The ack received for this onNext event will be ignored by the publisher as the global flow object should
// be either null or renewed when the ack's flow identifier is evaluated.
FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(
ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null);
ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null,
triggeringFlow != null ? triggeringFlow.getSubscribeToShardId() : shardId + "-no-flow-found");
subscriber.onNext(response);
subscriber.onComplete();
} else {
@ -294,28 +417,26 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
.millisBehindLatest(recordBatchEvent.millisBehindLatest())
.isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build();
FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input,
recordBatchEvent.continuationSequenceNumber());
recordBatchEvent.continuationSequenceNumber(), triggeringFlow.subscribeToShardId);
try {
subscriber.onNext(recordsRetrieved);
//
// Only advance the currentSequenceNumber if we successfully dispatch the last received input
//
currentSequenceNumber = recordBatchEvent.continuationSequenceNumber();
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
} catch (Throwable t) {
log.warn("{}: Unable to call onNext for subscriber. Failing publisher.", shardId);
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId);
errorOccurred(triggeringFlow, t);
}
}
}
if (availableQueueSpace <= 0) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
} else {
availableQueueSpace--;
if (availableQueueSpace > 0) {
triggeringFlow.request(1);
}
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
if (availableQueueSpace <= 0) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
} else {
availableQueueSpace--;
if (availableQueueSpace > 0) {
triggeringFlow.request(1);
}
}
}
@ -489,11 +610,18 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final ProcessRecordsInput processRecordsInput;
private final String continuationSequenceNumber;
private final String flowIdentifier;
private final String batchUniqueIdentifier = UUID.randomUUID().toString();
@Override
public ProcessRecordsInput processRecordsInput() {
return processRecordsInput;
}
@Override
public BatchUniqueIdentifier batchUniqueIdentifier() {
return new BatchUniqueIdentifier(batchUniqueIdentifier, flowIdentifier);
}
}
@RequiredArgsConstructor
@ -502,6 +630,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final FanOutRecordsPublisher parent;
private final Instant connectionStartedAt;
@Getter @VisibleForTesting
private final String subscribeToShardId;
private RecordSubscription subscription;
@ -734,5 +863,4 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
}

View file

@ -20,6 +20,7 @@ import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -49,11 +50,14 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
/**
* This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
* next set of records and stores it in the cache. The size of the cache is limited by setting
@ -93,6 +97,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
private boolean wasReset = false;
private final Semaphore eventDeliveryLock = new Semaphore(1);
private Instant eventDeliveryLockAcquireTime = Instant.EPOCH;
/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
* LinkedBlockingQueue.
@ -216,6 +223,13 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
});
}
@Override
public void notify(RecordsDeliveryAck ack) {
eventDeliveryLock.release();
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, eventDeliveryLockAcquireTime, log);
}
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
wasReset = false;
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
@ -236,6 +250,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private synchronized void drainQueueForRequests() {
while (requestedResponses.get() > 0 && !getRecordsResultQueue.isEmpty()) {
eventDeliveryLock.acquireUninterruptibly();
eventDeliveryLockAcquireTime = Instant.now();
subscriber.onNext(getNextResult());
}
}

View file

@ -64,6 +64,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -554,6 +555,11 @@ public class ShardConsumerSubscriberTest {
}
@Override
public void notify(RecordsDeliveryAck ack) {
}
@Override
public void shutdown() {

View file

@ -74,6 +74,7 @@ import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -198,6 +199,11 @@ public class ShardConsumerTest {
}
@Override
public void notify(RecordsDeliveryAck ack) {
}
@Override
public void shutdown() {

View file

@ -1,27 +1,14 @@
package software.amazon.kinesis.retrieval.fanout;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.handler.timeout.ReadTimeoutException;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.SafeSubscriber;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
@ -29,13 +16,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.netty.handler.timeout.ReadTimeoutException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
@ -49,12 +33,51 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
@Slf4j
public class FanOutRecordsPublisherTest {
@ -89,32 +112,28 @@ public class FanOutRecordsPublisherTest {
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
source.subscribe(new Subscriber<RecordsRetrieved>() {
source.subscribe(new ShardConsumerNotifyingSubscriber(new Subscriber<RecordsRetrieved>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(RecordsRetrieved input) {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
subscription.request(1);
}
@Override
public void onError(Throwable t) {
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override
public void onComplete() {
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
});
}, source));
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
@ -142,6 +161,473 @@ public class FanOutRecordsPublisherTest {
}
@Test
public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
Subscription subscription;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
subscription.request(1);
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
}, source);
Scheduler testScheduler = getScheduler(getBlockingExecutor(getSpiedExecutor(getTestExecutor())));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(shardConsumerSubscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
Stream.of("1000", "2000", "3000")
.map(contSeqNum ->
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum)
.records(records).build())
.forEach(batchEvent -> captor.getValue().onNext(batchEvent));
verify(subscription, times(4)).request(1);
assertThat(receivedInput.size(), equalTo(3));
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
assertThat(source.getCurrentSequenceNumber(), equalTo("3000"));
}
@Test
public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
Subscription subscription;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
subscription.request(1);
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
}, source);
Scheduler testScheduler = getScheduler(getOverwhelmedBlockingExecutor(getSpiedExecutor(getTestExecutor())));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(new SafeSubscriber<>(shardConsumerSubscriber));
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
Stream.of("1000", "2000", "3000")
.map(contSeqNum ->
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum)
.records(records).build())
.forEach(batchEvent -> captor.getValue().onNext(batchEvent));
verify(subscription, times(2)).request(1);
assertThat(receivedInput.size(), equalTo(1));
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
assertThat(source.getCurrentSequenceNumber(), equalTo("1000"));
}
@Test
public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
Consumer<Integer> servicePublisherAction = contSeqNum -> captor.getValue().onNext(
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum + "")
.records(records)
.build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
int totalServicePublisherEvents = 1000;
int initialDemand = 0;
BackpressureAdheringServicePublisher servicePublisher =
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
private Subscription subscription;
private int lastSeenSeqNum = 0;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
servicePublisher.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if(receivedInput.size() == totalServicePublisherEvents) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
}, source);
ExecutorService executorService = getTestExecutor();
Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService)));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(shardConsumerSubscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
executorService.submit(servicePublisher);
servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(receivedInput.size(), equalTo(totalServicePublisherEvents));
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
}
@Test
public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstWithinLimit() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
Consumer<Integer> servicePublisherAction = contSeqNum -> captor.getValue().onNext(
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum + "")
.records(records)
.build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
int totalServicePublisherEvents = 1000;
int initialDemand = 9;
BackpressureAdheringServicePublisher servicePublisher =
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
private Subscription subscription;
private int lastSeenSeqNum = 0;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
servicePublisher.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
if(receivedInput.size() == totalServicePublisherEvents) {
servicePublisherTaskCompletionLatch.countDown();
}
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
}, source);
ExecutorService executorService = getTestExecutor();
Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService)));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(shardConsumerSubscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
executorService.submit(servicePublisher);
servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(receivedInput.size(), equalTo(totalServicePublisherEvents));
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + ""));
}
@Test
public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstOverLimit() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
List<Record> records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList());
Consumer<Integer> servicePublisherAction = contSeqNum -> captor.getValue().onNext(
SubscribeToShardEvent.builder()
.millisBehindLatest(100L)
.continuationSequenceNumber(contSeqNum + "")
.records(records)
.build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(1);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
BackpressureAdheringServicePublisher servicePublisher =
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
doNothing().when(publisher).subscribe(captor.capture());
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
AtomicBoolean onErrorSet = new AtomicBoolean(false);
Subscriber<RecordsRetrieved> shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber(
new Subscriber<RecordsRetrieved>() {
private Subscription subscription;
private int lastSeenSeqNum = 0;
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
servicePublisher.request(1);
}
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber());
subscription.request(1);
servicePublisher.request(1);
}
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
onErrorSet.set(true);
servicePublisherTaskCompletionLatch.countDown();
}
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
}, source);
ExecutorService executorService = getTestExecutor();
Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService)));
int bufferSize = 8;
Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize)
.subscribe(shardConsumerSubscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
captor.getValue().onSubscribe(subscription);
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
executorService.submit(servicePublisher);
servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("onError should have triggered", onErrorSet.get());
receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
});
}
private Scheduler getScheduler(ExecutorService executorService) {
return Schedulers.from(executorService);
}
private ExecutorService getTestExecutor() {
return Executors.newFixedThreadPool(8,
new ThreadFactoryBuilder().setNameFormat("test-fanout-record-publisher-%04d").setDaemon(true).build());
}
private ExecutorService getSpiedExecutor(ExecutorService executorService) {
return spy(executorService);
}
private ExecutorService getBlockingExecutor(ExecutorService executorService) {
doAnswer(invocation -> directlyExecuteRunnable(invocation)).when(executorService).execute(any());
return executorService;
}
private ExecutorService getInitiallyBlockingExecutor(ExecutorService executorService) {
doAnswer(invocation -> directlyExecuteRunnable(invocation))
.doAnswer(invocation -> directlyExecuteRunnable(invocation))
.doCallRealMethod()
.when(executorService).execute(any());
return executorService;
}
private ExecutorService getOverwhelmedBlockingExecutor(ExecutorService executorService) {
doAnswer(invocation -> directlyExecuteRunnable(invocation))
.doAnswer(invocation -> directlyExecuteRunnable(invocation))
.doAnswer(invocation -> directlyExecuteRunnable(invocation))
.doThrow(new RejectedExecutionException())
.doAnswer(invocation -> directlyExecuteRunnable(invocation))
.when(executorService).execute(any());
return executorService;
}
private Object directlyExecuteRunnable(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
Runnable runnable = (Runnable) args[0];
runnable.run();
return null;
}
@Test
public void largeRequestTest() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
@ -158,32 +644,28 @@ public class FanOutRecordsPublisherTest {
List<ProcessRecordsInput> receivedInput = new ArrayList<>();
source.subscribe(new Subscriber<RecordsRetrieved>() {
source.subscribe(new ShardConsumerNotifyingSubscriber(new Subscriber<RecordsRetrieved>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
@Override public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(3);
}
@Override
public void onNext(RecordsRetrieved input) {
@Override public void onNext(RecordsRetrieved input) {
receivedInput.add(input.processRecordsInput());
subscription.request(1);
}
@Override
public void onError(Throwable t) {
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
fail("Caught throwable in subscriber");
}
@Override
public void onComplete() {
@Override public void onComplete() {
fail("OnComplete called when not expected");
}
});
}, source));
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
@ -270,7 +752,7 @@ public class FanOutRecordsPublisherTest {
NonFailingSubscriber nonFailingSubscriber = new NonFailingSubscriber();
source.subscribe(nonFailingSubscriber);
source.subscribe(new ShardConsumerNotifyingSubscriber(nonFailingSubscriber, source));
SubscribeToShardRequest expected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN).shardId(SHARD_ID)
.startingPosition(StartingPosition.builder().sequenceNumber("0")
@ -327,6 +809,171 @@ public class FanOutRecordsPublisherTest {
}
@Test
public void testIfBufferingRecordsWithinCapacityPublishesOneEvent() {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
RecordsRetrieved recordsRetrieved = ProcessRecordsInput.builder()::build;
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001-001");
final int[] totalRecordsRetrieved = { 0 };
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow));
assertEquals(1, totalRecordsRetrieved[0]);
}
@Test
public void testIfBufferingRecordsOverCapacityPublishesOneEventAndThrows() {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
RecordsRetrieved recordsRetrieved = ProcessRecordsInput.builder()::build;
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001");
final int[] totalRecordsRetrieved = { 0 };
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
try {
IntStream.rangeClosed(1, 11).forEach(
i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow));
fail("Should throw Queue full exception");
} catch (IllegalStateException e) {
assertEquals("Queue full", e.getMessage());
}
assertEquals(1, totalRecordsRetrieved[0]);
}
@Test
public void testIfPublisherAlwaysPublishesWhenQueueIsEmpty() {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001");
final int[] totalRecordsRetrieved = { 0 };
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
// This makes sure the queue is immediately made empty, so that the next event enqueued will
// be the only element in the queue.
fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier());
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 137).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
recordFlow));
assertEquals(137, totalRecordsRetrieved[0]);
}
@Test
public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlow() {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001");
final int[] totalRecordsRetrieved = { 0 };
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
// This makes sure the queue is immediately made empty, so that the next event enqueued will
// be the only element in the queue.
fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically
if(totalRecordsRetrieved[0] % 10 == 0) {
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
}
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 100).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
recordFlow));
assertEquals(100, totalRecordsRetrieved[0]);
}
@Test
public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliveryQueueIsNotEmpty()
throws InterruptedException {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001");
final int[] totalRecordsRetrieved = { 0 };
BlockingQueue<BatchUniqueIdentifier> ackQueue = new LinkedBlockingQueue<>();
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
// Enqueue the ack for bursty delivery
ackQueue.add(recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
recordFlow));
BatchUniqueIdentifier batchUniqueIdentifierQueued;
int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected.
while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal);
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
}
assertEquals(10, totalRecordsRetrieved[0]);
}
@Test(expected = IllegalStateException.class)
public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws InterruptedException {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "Shard-001-1");
final int[] totalRecordsRetrieved = { 0 };
BlockingQueue<BatchUniqueIdentifier> ackQueue = new LinkedBlockingQueue<>();
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
// Enqueue the ack for bursty delivery
ackQueue.add(recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
recordFlow));
BatchUniqueIdentifier batchUniqueIdentifierQueued;
int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected.
while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier()));
}
}
@Test
public void acquireTimeoutTriggersLogMethodForActiveFlow() {
AtomicBoolean acquireTimeoutLogged = new AtomicBoolean(false);
@ -477,6 +1124,36 @@ public class FanOutRecordsPublisherTest {
}
}
@RequiredArgsConstructor
private static class BackpressureAdheringServicePublisher implements Runnable {
private final Consumer<Integer> action;
private final Integer numOfTimes;
private final CountDownLatch taskCompletionLatch;
private final Semaphore demandNotifier;
BackpressureAdheringServicePublisher(Consumer<Integer> action, Integer numOfTimes,
CountDownLatch taskCompletionLatch, Integer initialDemand) {
this(action, numOfTimes, taskCompletionLatch, new Semaphore(initialDemand));
}
public void request(int n) {
demandNotifier.release(n);
}
public void run() {
for (int i = 1; i <= numOfTimes; ) {
demandNotifier.acquireUninterruptibly();
action.accept(i++);
}
taskCompletionLatch.countDown();
}
}
private Record makeRecord(String sequenceNumber) {
return makeRecord(Integer.parseInt(sequenceNumber));
}
private Record makeRecord(int sequenceNumber) {
SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 });
return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now())

View file

@ -71,6 +71,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -299,7 +300,7 @@ public class PrefetchRecordsPublisherTest {
Object lock = new Object();
Subscriber<RecordsRetrieved> subscriber = new Subscriber<RecordsRetrieved>() {
Subscriber<RecordsRetrieved> delegateSubscriber = new Subscriber<RecordsRetrieved>() {
Subscription sub;
@Override
@ -334,6 +335,8 @@ public class PrefetchRecordsPublisherTest {
}
};
Subscriber<RecordsRetrieved> subscriber = new ShardConsumerNotifyingSubscriber(delegateSubscriber, getRecordsCache);
synchronized (lock) {
log.info("Awaiting notification");
Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation())