diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index bf6cb189..b96ff8c1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -168,7 +168,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue or execute the subscription shutdown action. if (!recordsDeliveryQueue.isEmpty()) { - recordsDeliveryQueue.peek().executeEventAction(subscriber, recordsDeliveryQueue); + recordsDeliveryQueue.peek().executeEventAction(subscriber); } } else { // Check if the mismatched event belongs to active flow. If publisher receives an ack for a @@ -225,15 +225,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. - void executeEventAction(Subscriber subscriber, BlockingQueue recordsDeliveryQueue) { + void executeEventAction(Subscriber subscriber) { recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent), - shutdownEvent -> handleShutdownEvent(shutdownEvent, recordsDeliveryQueue)); - } - - private void handleShutdownEvent(SubscriptionShutdownEvent shutdownEvent, BlockingQueue recordsDeliveryQueue) { - //Removing the Shutdown Event from the queue before executing the Shutdown action. - recordsDeliveryQueue.poll(); - shutdownEvent.getSubscriptionShutdownAction().run(); + shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run()); } } @@ -735,7 +729,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { executeExceptionOccurred(throwable); } else { final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( - () -> executeExceptionOccurred(throwable), "onError", throwable); + () -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } } @@ -774,7 +768,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { executeComplete(); } else { final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( - () -> executeComplete(), "onComplete"); + () -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete"); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } }