diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java new file mode 100644 index 00000000..db0393e1 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/DiagnosticUtils.java @@ -0,0 +1,50 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import org.slf4j.Logger; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +import java.time.Duration; +import java.time.Instant; + +import static software.amazon.kinesis.lifecycle.ShardConsumer.MAX_TIME_BETWEEN_REQUEST_RESPONSE; + +@KinesisClientInternalApi +public class DiagnosticUtils { + + /** + * Util for RecordPublisher to measure the event delivery latency of the executor service and take appropriate action. + * @param shardId of the shard that is having delayed delivery + * @param enqueueTimestamp of the event submitted to the executor service + * @param log Slf4j Logger from RecordPublisher to log the events + */ + public static void takeDelayedDeliveryActionIfRequired(String shardId, Instant enqueueTimestamp, Logger log) { + final long durationBetweenEnqueueAndAckInMillis = Duration + .between(enqueueTimestamp, Instant.now()).toMillis(); + if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) { + // The above condition logs the warn msg if the delivery time exceeds 11 seconds. + log.warn( + "{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs" + + " to see the state of the executor service. Also check if the RecordProcessor's processing " + + "time is high. ", + shardId, durationBetweenEnqueueAndAckInMillis); + } else if (log.isDebugEnabled()) { + log.debug("{}: Record delivery time to shard consumer is {} millis", shardId, + durationBetweenEnqueueAndAckInMillis); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java index d78b64b6..7dc8dfaf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -26,7 +26,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; @KinesisClientInternalApi class RejectedTaskEvent implements DiagnosticEvent { private static final String MESSAGE = "Review your thread configuration to prevent task rejections. " + - "Until next release, KCL will not be resilient to task rejections. "; + "Task rejections will slow down your application and some shards may stop processing. "; private ExecutorStateEvent executorStateEvent; private Throwable throwable; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java new file mode 100644 index 00000000..f3599c71 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/NotifyingSubscriber.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.lifecycle; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; + +/** + * Subscriber that notifies its publisher on receipt of the onNext event. + */ +public interface NotifyingSubscriber extends Subscriber { + + /** + * Return the actual subscriber to which the events needs to be delegated. + * @return Subscriber to be delegated + */ + Subscriber getDelegateSubscriber(); + + /** + * Return the publisher to be notified + * @return RecordsPublisher to be notified. + */ + RecordsPublisher getRecordsPublisher(); + + /** + * Construct RecordsDeliveryAck object from the incoming data and return it + * @param recordsRetrieved for which we need the ack. + * @return getRecordsDeliveryAck + */ + RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved); + + @Override + default void onSubscribe(Subscription subscription) { + getDelegateSubscriber().onSubscribe(subscription); + } + + @Override + default void onNext(RecordsRetrieved recordsRetrieved) { + getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved)); + getDelegateSubscriber().onNext(recordsRetrieved); + } + + @Override + default void onError(Throwable throwable) { + getDelegateSubscriber().onError(throwable); + } + + @Override + default void onComplete() { + getDelegateSubscriber().onComplete(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java new file mode 100644 index 00000000..3ef9fc1d --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerNotifyingSubscriber.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.lifecycle; + +import lombok.AllArgsConstructor; +import org.reactivestreams.Subscriber; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; + +@KinesisClientInternalApi +@AllArgsConstructor +public class ShardConsumerNotifyingSubscriber implements NotifyingSubscriber { + + private final Subscriber delegate; + + private final RecordsPublisher recordsPublisher; + + @Override + public Subscriber getDelegateSubscriber() { + return delegate; + } + + @Override + public RecordsPublisher getRecordsPublisher() { + return recordsPublisher; + } + + @Override + public RecordsDeliveryAck getRecordsDeliveryAck(RecordsRetrieved recordsRetrieved) { + return () -> recordsRetrieved.batchUniqueIdentifier(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 102982f2..14e347a2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -82,7 +82,7 @@ class ShardConsumerSubscriber implements Subscriber { recordsPublisher.restartFrom(lastAccepted); } Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) - .subscribe(this); + .subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher)); } } @@ -216,4 +216,5 @@ class ShardConsumerSubscriber implements Subscriber { subscription.cancel(); } } + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java new file mode 100644 index 00000000..0a0e2aeb --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/BatchUniqueIdentifier.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import lombok.Data; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +@KinesisClientInternalApi +@Data +public class BatchUniqueIdentifier { + private final String recordBatchIdentifier; + private final String flowIdentifier; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java new file mode 100644 index 00000000..487e1637 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsDeliveryAck.java @@ -0,0 +1,29 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +/** + * Interface to supply all the meta information for record delivery ack. + */ +public interface RecordsDeliveryAck { + + /** + * Unique record batch identifier used to ensure the durability and ordering guarantees. + * @return id that uniquely determines a record batch and its source. + */ + BatchUniqueIdentifier batchUniqueIdentifier(); + +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java index fffeee08..cd1e04f1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java @@ -18,7 +18,6 @@ package software.amazon.kinesis.retrieval; import org.reactivestreams.Publisher; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -47,4 +46,12 @@ public interface RecordsPublisher extends Publisher { * Shutdowns the publisher. Once this method returns the publisher should no longer provide any records. */ void shutdown(); + + /** + * Notify the publisher on receipt of a data event. + * @param ack acknowledgement received from the subscriber. + */ + default void notify(RecordsDeliveryAck ack) { + throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber"); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java index 7db9d05f..f6f5bb7f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsRetrieved.java @@ -24,4 +24,13 @@ public interface RecordsRetrieved { * @return the processRecordsInput received */ ProcessRecordsInput processRecordsInput(); + + /** + * Returns the identifier that uniquely identifies this batch. + * + * @return batchUniqueIdentifier that uniquely identifies the records batch and its source. + */ + default BatchUniqueIdentifier batchUniqueIdentifier() { + throw new UnsupportedOperationException("Retrieval of batch unique identifier is not supported"); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index e738f0c6..53c3d715 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -15,20 +15,15 @@ package software.amazon.kinesis.retrieval.fanout; -import java.time.Instant; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - +import com.google.common.annotations.VisibleForTesting; import lombok.Data; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; @@ -41,13 +36,26 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; + @RequiredArgsConstructor @Slf4j @KinesisClientInternalApi @@ -55,6 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory( ThrowableType.ACQUIRE_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); + private static final int MAX_EVENT_BURST_FROM_SERVICE = 10; private final KinesisAsyncClient kinesis; private final String shardId; @@ -63,9 +72,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final Object lockObject = new Object(); private final AtomicInteger subscribeToShardId = new AtomicInteger(0); - private RecordFlow flow; - + @Getter @VisibleForTesting private String currentSequenceNumber; private InitialPositionInStreamExtended initialPositionInStreamExtended; private boolean isFirstConnection = true; @@ -73,6 +81,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private Subscriber subscriber; private long availableQueueSpace = 0; + private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); + @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) { @@ -114,12 +124,107 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + @Override + public void notify(RecordsDeliveryAck recordsDeliveryAck) { + synchronized (lockObject) { + RecordFlow triggeringFlow = null; + try { + triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck); + } catch (Throwable t) { + errorOccurred(triggeringFlow, t); + } + if (triggeringFlow != null) { + updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); + } + } + } + + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. + @VisibleForTesting + RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) { + // Peek the head of the queue on receiving the ack. + // Note : This does not block wait to retrieve an element. + final RecordsRetrievedContext recordsRetrievedContext = recordsDeliveryQueue.peek(); + // RecordFlow of the current event that needs to be returned + RecordFlow flowToBeReturned = null; + + // Check if the ack corresponds to the head of the delivery queue. + if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier() + .equals(recordsDeliveryAck.batchUniqueIdentifier())) { + // It is now safe to remove the element + recordsDeliveryQueue.poll(); + // Take action based on the time spent by the event in queue. + takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log); + // Update current sequence number for the successfully delivered event. + currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber(); + // Update the triggering flow for post scheduling upstream request. + flowToBeReturned = recordsRetrievedContext.getRecordFlow(); + // Try scheduling the next event in the queue, if available. + if (recordsDeliveryQueue.peek() != null) { + subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); + } + } else { + // Check if the mismatched event belongs to active flow. If publisher receives an ack for a + // missing event in active flow, then it means the event was already acked or cleared + // from the queue due to a potential bug. + if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier() + .equals(flow.getSubscribeToShardId())) { + log.error( + "{}: Received unexpected ack for the active subscription {}. Throwing. ", + shardId, recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); + throw new IllegalStateException("Unexpected ack for the active subscription"); + } + // Otherwise publisher received a stale ack. + else { + log.info("{}: Publisher received an ack for stale subscription {}. Ignoring.", shardId, + recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()); + } + } + return flowToBeReturned; + } + + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. + @VisibleForTesting + void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) { + final RecordsRetrievedContext recordsRetrievedContext = + new RecordsRetrievedContext(recordsRetrieved, triggeringFlow, Instant.now()); + try { + // Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure. + // Note: This does not block wait to enqueue. + recordsDeliveryQueue.add(recordsRetrievedContext); + // If the current batch is the only element in the queue, then try scheduling the event delivery. + if (recordsDeliveryQueue.size() == 1) { + subscriber.onNext(recordsRetrieved); + } + } catch (IllegalStateException e) { + log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", + shardId, recordsDeliveryQueue.remainingCapacity()); + throw e; + } catch (Throwable t) { + log.error("{}: Unable to deliver event to the shard consumer.", shardId, t); + throw t; + } + } + + @Data + private static final class RecordsRetrievedContext { + private final RecordsRetrieved recordsRetrieved; + private final RecordFlow recordFlow; + private final Instant enqueueTimestamp; + } + private boolean hasValidSubscriber() { return subscriber != null; } + private boolean hasValidFlow() { + return flow != null; + } + private void subscribeToShard(String sequenceNumber) { synchronized (lockObject) { + // Clear the queue so that any stale entries from previous subscription are discarded. + clearRecordsDeliveryQueue(); SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() .shardId(shardId).consumerARN(consumerArn); SubscribeToShardRequest request; @@ -143,12 +248,23 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { synchronized (lockObject) { + if (!hasValidSubscriber()) { - log.warn( - "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", - shardId, flow.connectionStartedAt, flow.subscribeToShardId); + if(hasValidFlow()) { + log.warn( + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null", + shardId, flow.connectionStartedAt, flow.subscribeToShardId); + } else { + log.warn( + "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null", + shardId); + } return; } + + // Clear the delivery buffer so that next subscription don't yield duplicate records. + clearRecordsDeliveryQueue(); + Throwable propagationThrowable = t; ThrowableCategory category = throwableCategory(propagationThrowable); @@ -178,7 +294,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { availableQueueSpace = 0; try { - handleFlowError(propagationThrowable); + handleFlowError(propagationThrowable, triggeringFlow); } catch (Throwable innerThrowable) { log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } @@ -197,6 +313,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + private void clearRecordsDeliveryQueue() { + recordsDeliveryQueue.clear(); + } + protected void logAcquireTimeoutMessage(Throwable t) { log.error("An acquire timeout occurred which usually indicates that the KinesisAsyncClient supplied has a " + "low maximum streams limit. " + @@ -204,13 +324,16 @@ public class FanOutRecordsPublisher implements RecordsPublisher { "or refer to the class to setup the client manually."); } - private void handleFlowError(Throwable t) { + private void handleFlowError(Throwable t, RecordFlow triggeringFlow) { if (t.getCause() instanceof ResourceNotFoundException) { log.debug( "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", shardId); + // The ack received for this onNext event will be ignored by the publisher as the global flow object should + // be either null or renewed when the ack's flow identifier is evaluated. FanoutRecordsRetrieved response = new FanoutRecordsRetrieved( - ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null); + ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null, + triggeringFlow != null ? triggeringFlow.getSubscribeToShardId() : shardId + "-no-flow-found"); subscriber.onNext(response); subscriber.onComplete(); } else { @@ -294,28 +417,26 @@ public class FanOutRecordsPublisher implements RecordsPublisher { .millisBehindLatest(recordBatchEvent.millisBehindLatest()) .isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build(); FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, - recordBatchEvent.continuationSequenceNumber()); + recordBatchEvent.continuationSequenceNumber(), triggeringFlow.subscribeToShardId); try { - subscriber.onNext(recordsRetrieved); - // - // Only advance the currentSequenceNumber if we successfully dispatch the last received input - // - currentSequenceNumber = recordBatchEvent.continuationSequenceNumber(); + bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow); } catch (Throwable t) { - log.warn("{}: Unable to call onNext for subscriber. Failing publisher.", shardId); + log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId); errorOccurred(triggeringFlow, t); } + } + } - if (availableQueueSpace <= 0) { - log.debug( - "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); - } else { - availableQueueSpace--; - if (availableQueueSpace > 0) { - triggeringFlow.request(1); - } + private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) { + if (availableQueueSpace <= 0) { + log.debug( + "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", + shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + } else { + availableQueueSpace--; + if (availableQueueSpace > 0) { + triggeringFlow.request(1); } } } @@ -489,11 +610,18 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final ProcessRecordsInput processRecordsInput; private final String continuationSequenceNumber; + private final String flowIdentifier; + private final String batchUniqueIdentifier = UUID.randomUUID().toString(); @Override public ProcessRecordsInput processRecordsInput() { return processRecordsInput; } + + @Override + public BatchUniqueIdentifier batchUniqueIdentifier() { + return new BatchUniqueIdentifier(batchUniqueIdentifier, flowIdentifier); + } } @RequiredArgsConstructor @@ -502,6 +630,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private final FanOutRecordsPublisher parent; private final Instant connectionStartedAt; + @Getter @VisibleForTesting private final String subscribeToShardId; private RecordSubscription subscription; @@ -734,5 +863,4 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 83991be9..f5aaf051 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,11 +50,14 @@ import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; + /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the * next set of records and stores it in the cache. The size of the cache is limited by setting @@ -93,6 +97,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock(); private boolean wasReset = false; + private final Semaphore eventDeliveryLock = new Semaphore(1); + private Instant eventDeliveryLockAcquireTime = Instant.EPOCH; + /** * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a * LinkedBlockingQueue. @@ -216,6 +223,13 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { }); } + @Override + public void notify(RecordsDeliveryAck ack) { + eventDeliveryLock.release(); + // Take action based on the time spent by the event in queue. + takeDelayedDeliveryActionIfRequired(shardId, eventDeliveryLockAcquireTime, log); + } + private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException { wasReset = false; while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { @@ -236,6 +250,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private synchronized void drainQueueForRequests() { while (requestedResponses.get() > 0 && !getRecordsResultQueue.isEmpty()) { + eventDeliveryLock.acquireUninterruptibly(); + eventDeliveryLockAcquireTime = Instant.now(); subscriber.onNext(getNextResult()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 494dd282..a4002ab9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -64,6 +64,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -554,6 +555,11 @@ public class ShardConsumerSubscriberTest { } + @Override + public void notify(RecordsDeliveryAck ack) { + + } + @Override public void shutdown() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 320512e6..533a200a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -74,6 +74,7 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput; import software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState; +import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -198,6 +199,11 @@ public class ShardConsumerTest { } + @Override + public void notify(RecordsDeliveryAck ack) { + + } + @Override public void shutdown() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index b12fc1a5..1da65f2d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -1,27 +1,14 @@ package software.amazon.kinesis.retrieval.fanout; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import java.util.stream.Stream; - +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.handler.timeout.ReadTimeoutException; +import io.reactivex.Flowable; +import io.reactivex.Scheduler; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.SafeSubscriber; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -29,13 +16,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; - -import io.netty.handler.timeout.ReadTimeoutException; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; @@ -49,12 +33,51 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + @RunWith(MockitoJUnitRunner.class) @Slf4j public class FanOutRecordsPublisherTest { @@ -89,32 +112,28 @@ public class FanOutRecordsPublisherTest { List receivedInput = new ArrayList<>(); - source.subscribe(new Subscriber() { + source.subscribe(new ShardConsumerNotifyingSubscriber(new Subscriber() { Subscription subscription; - @Override - public void onSubscribe(Subscription s) { + @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } - @Override - public void onNext(RecordsRetrieved input) { + @Override public void onNext(RecordsRetrieved input) { receivedInput.add(input.processRecordsInput()); subscription.request(1); } - @Override - public void onError(Throwable t) { + @Override public void onError(Throwable t) { log.error("Caught throwable in subscriber", t); fail("Caught throwable in subscriber"); } - @Override - public void onComplete() { + @Override public void onComplete() { fail("OnComplete called when not expected"); } - }); + }, source)); verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); flowCaptor.getValue().onEventStream(publisher); @@ -142,6 +161,473 @@ public class FanOutRecordsPublisherTest { } + @Test + public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + Subscription subscription; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + subscription.request(1); + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + Scheduler testScheduler = getScheduler(getBlockingExecutor(getSpiedExecutor(getTestExecutor()))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + Stream.of("1000", "2000", "3000") + .map(contSeqNum -> + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum) + .records(records).build()) + .forEach(batchEvent -> captor.getValue().onNext(batchEvent)); + + verify(subscription, times(4)).request(1); + assertThat(receivedInput.size(), equalTo(3)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo("3000")); + + } + + @Test + public void testIfEventsAreNotDeliveredToShardConsumerWhenPreviousEventDeliveryTaskGetsRejected() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + Subscription subscription; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + subscription.request(1); + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + Scheduler testScheduler = getScheduler(getOverwhelmedBlockingExecutor(getSpiedExecutor(getTestExecutor()))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(new SafeSubscriber<>(shardConsumerSubscriber)); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + Stream.of("1000", "2000", "3000") + .map(contSeqNum -> + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum) + .records(records).build()) + .forEach(batchEvent -> captor.getValue().onNext(batchEvent)); + + verify(subscription, times(2)).request(1); + assertThat(receivedInput.size(), equalTo(1)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo("1000")); + + } + + @Test + public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + int totalServicePublisherEvents = 1000; + int initialDemand = 0; + BackpressureAdheringServicePublisher servicePublisher = + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + if(receivedInput.size() == totalServicePublisherEvents) { + servicePublisherTaskCompletionLatch.countDown(); + } + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertThat(receivedInput.size(), equalTo(totalServicePublisherEvents)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + "")); + + } + + @Test + public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstWithinLimit() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + int totalServicePublisherEvents = 1000; + int initialDemand = 9; + BackpressureAdheringServicePublisher servicePublisher = + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + if(receivedInput.size() == totalServicePublisherEvents) { + servicePublisherTaskCompletionLatch.countDown(); + } + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertThat(receivedInput.size(), equalTo(totalServicePublisherEvents)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo(totalServicePublisherEvents + "")); + + } + + @Test + public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstOverLimit() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(1); + int totalServicePublisherEvents = 1000; + int initialDemand = 10; + BackpressureAdheringServicePublisher servicePublisher = + new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + AtomicBoolean onErrorSet = new AtomicBoolean(false); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + onErrorSet.set(true); + servicePublisherTaskCompletionLatch.countDown(); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertTrue("onError should have triggered", onErrorSet.get()); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + } + + private Scheduler getScheduler(ExecutorService executorService) { + return Schedulers.from(executorService); + } + + private ExecutorService getTestExecutor() { + return Executors.newFixedThreadPool(8, + new ThreadFactoryBuilder().setNameFormat("test-fanout-record-publisher-%04d").setDaemon(true).build()); + } + + private ExecutorService getSpiedExecutor(ExecutorService executorService) { + return spy(executorService); + } + + private ExecutorService getBlockingExecutor(ExecutorService executorService) { + doAnswer(invocation -> directlyExecuteRunnable(invocation)).when(executorService).execute(any()); + return executorService; + } + + private ExecutorService getInitiallyBlockingExecutor(ExecutorService executorService) { + doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doCallRealMethod() + .when(executorService).execute(any()); + return executorService; + } + + private ExecutorService getOverwhelmedBlockingExecutor(ExecutorService executorService) { + doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .doThrow(new RejectedExecutionException()) + .doAnswer(invocation -> directlyExecuteRunnable(invocation)) + .when(executorService).execute(any()); + return executorService; + } + + private Object directlyExecuteRunnable(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + @Test public void largeRequestTest() throws Exception { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); @@ -158,32 +644,28 @@ public class FanOutRecordsPublisherTest { List receivedInput = new ArrayList<>(); - source.subscribe(new Subscriber() { + source.subscribe(new ShardConsumerNotifyingSubscriber(new Subscriber() { Subscription subscription; - @Override - public void onSubscribe(Subscription s) { + @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(3); } - @Override - public void onNext(RecordsRetrieved input) { + @Override public void onNext(RecordsRetrieved input) { receivedInput.add(input.processRecordsInput()); subscription.request(1); } - @Override - public void onError(Throwable t) { + @Override public void onError(Throwable t) { log.error("Caught throwable in subscriber", t); fail("Caught throwable in subscriber"); } - @Override - public void onComplete() { + @Override public void onComplete() { fail("OnComplete called when not expected"); } - }); + }, source)); verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); flowCaptor.getValue().onEventStream(publisher); @@ -270,7 +752,7 @@ public class FanOutRecordsPublisherTest { NonFailingSubscriber nonFailingSubscriber = new NonFailingSubscriber(); - source.subscribe(nonFailingSubscriber); + source.subscribe(new ShardConsumerNotifyingSubscriber(nonFailingSubscriber, source)); SubscribeToShardRequest expected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN).shardId(SHARD_ID) .startingPosition(StartingPosition.builder().sequenceNumber("0") @@ -327,6 +809,171 @@ public class FanOutRecordsPublisherTest { } + @Test + public void testIfBufferingRecordsWithinCapacityPublishesOneEvent() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + RecordsRetrieved recordsRetrieved = ProcessRecordsInput.builder()::build; + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow)); + assertEquals(1, totalRecordsRetrieved[0]); + } + + @Test + public void testIfBufferingRecordsOverCapacityPublishesOneEventAndThrows() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + RecordsRetrieved recordsRetrieved = ProcessRecordsInput.builder()::build; + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + try { + IntStream.rangeClosed(1, 11).forEach( + i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow)); + fail("Should throw Queue full exception"); + } catch (IllegalStateException e) { + assertEquals("Queue full", e.getMessage()); + } + assertEquals(1, totalRecordsRetrieved[0]); + } + + @Test + public void testIfPublisherAlwaysPublishesWhenQueueIsEmpty() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // This makes sure the queue is immediately made empty, so that the next event enqueued will + // be the only element in the queue. + fanOutRecordsPublisher + .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier()); + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 137).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + assertEquals(137, totalRecordsRetrieved[0]); + } + + @Test + public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlow() { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + fanOutRecordsPublisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // This makes sure the queue is immediately made empty, so that the next event enqueued will + // be the only element in the queue. + fanOutRecordsPublisher + .evictAckedEventAndScheduleNextEvent(() -> recordsRetrieved.batchUniqueIdentifier()); + // Send stale event periodically + if(totalRecordsRetrieved[0] % 10 == 0) { + fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( + () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow")); + } + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 100).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + assertEquals(100, totalRecordsRetrieved[0]); + } + + @Test + public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliveryQueueIsNotEmpty() + throws InterruptedException { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001"); + final int[] totalRecordsRetrieved = { 0 }; + BlockingQueue ackQueue = new LinkedBlockingQueue<>(); + fanOutRecordsPublisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // Enqueue the ack for bursty delivery + ackQueue.add(recordsRetrieved.batchUniqueIdentifier()); + // Send stale event periodically + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + BatchUniqueIdentifier batchUniqueIdentifierQueued; + int count = 0; + // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records + // delivered as expected. + while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) { + final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; + fanOutRecordsPublisher + .evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal); + fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( + () -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow")); + } + assertEquals(10, totalRecordsRetrieved[0]); + } + + @Test(expected = IllegalStateException.class) + public void testIfPublisherThrowsWhenMismatchAckforActiveFlowSeen() throws InterruptedException { + FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + FanOutRecordsPublisher.RecordFlow recordFlow = + new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "Shard-001-1"); + final int[] totalRecordsRetrieved = { 0 }; + BlockingQueue ackQueue = new LinkedBlockingQueue<>(); + fanOutRecordsPublisher.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription subscription) {} + @Override public void onNext(RecordsRetrieved recordsRetrieved) { + totalRecordsRetrieved[0]++; + // Enqueue the ack for bursty delivery + ackQueue.add(recordsRetrieved.batchUniqueIdentifier()); + // Send stale event periodically + } + @Override public void onError(Throwable throwable) {} + @Override public void onComplete() {} + }); + IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired( + new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()), + recordFlow)); + BatchUniqueIdentifier batchUniqueIdentifierQueued; + int count = 0; + // Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records + // delivered as expected. + while(count++ < 2 && (batchUniqueIdentifierQueued = ackQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) { + final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued; + fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent( + () -> new BatchUniqueIdentifier("some_uuid_str", batchUniqueIdentifierFinal.getFlowIdentifier())); + } + } + @Test public void acquireTimeoutTriggersLogMethodForActiveFlow() { AtomicBoolean acquireTimeoutLogged = new AtomicBoolean(false); @@ -477,6 +1124,36 @@ public class FanOutRecordsPublisherTest { } } + @RequiredArgsConstructor + private static class BackpressureAdheringServicePublisher implements Runnable { + + private final Consumer action; + private final Integer numOfTimes; + private final CountDownLatch taskCompletionLatch; + private final Semaphore demandNotifier; + + BackpressureAdheringServicePublisher(Consumer action, Integer numOfTimes, + CountDownLatch taskCompletionLatch, Integer initialDemand) { + this(action, numOfTimes, taskCompletionLatch, new Semaphore(initialDemand)); + } + + public void request(int n) { + demandNotifier.release(n); + } + + public void run() { + for (int i = 1; i <= numOfTimes; ) { + demandNotifier.acquireUninterruptibly(); + action.accept(i++); + } + taskCompletionLatch.countDown(); + } + } + + private Record makeRecord(String sequenceNumber) { + return makeRecord(Integer.parseInt(sequenceNumber)); + } + private Record makeRecord(int sequenceNumber) { SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 }); return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now()) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 57d550d1..e569921a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -71,6 +71,7 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; @@ -299,7 +300,7 @@ public class PrefetchRecordsPublisherTest { Object lock = new Object(); - Subscriber subscriber = new Subscriber() { + Subscriber delegateSubscriber = new Subscriber() { Subscription sub; @Override @@ -334,6 +335,8 @@ public class PrefetchRecordsPublisherTest { } }; + Subscriber subscriber = new ShardConsumerNotifyingSubscriber(delegateSubscriber, getRecordsCache); + synchronized (lock) { log.info("Awaiting notification"); Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation())