diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 426ba6c9..b96ff8c1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -227,7 +227,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. void executeEventAction(Subscriber subscriber) { recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent), - shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run()); + shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run()); } } @@ -729,7 +729,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { executeExceptionOccurred(throwable); } else { final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( - () -> executeExceptionOccurred(throwable), "onError", throwable); + () -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } } @@ -768,7 +768,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { executeComplete(); } else { final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( - () -> executeComplete(), "onComplete"); + () -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete"); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } }