From c5c4e428f294ade38e30f84914b44e29cb156a02 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Fri, 1 Nov 2019 09:47:24 -0700 Subject: [PATCH] Remove the shutdown event from the queue before executing the shudown event --- .../retrieval/fanout/FanOutRecordsPublisher.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 426ba6c9..bf6cb189 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -168,7 +168,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue or execute the subscription shutdown action. if (!recordsDeliveryQueue.isEmpty()) { - recordsDeliveryQueue.peek().executeEventAction(subscriber); + recordsDeliveryQueue.peek().executeEventAction(subscriber, recordsDeliveryQueue); } } else { // Check if the mismatched event belongs to active flow. If publisher receives an ack for a @@ -225,9 +225,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. - void executeEventAction(Subscriber subscriber) { + void executeEventAction(Subscriber subscriber, BlockingQueue recordsDeliveryQueue) { recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent), - shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run()); + shutdownEvent -> handleShutdownEvent(shutdownEvent, recordsDeliveryQueue)); + } + + private void handleShutdownEvent(SubscriptionShutdownEvent shutdownEvent, BlockingQueue recordsDeliveryQueue) { + //Removing the Shutdown Event from the queue before executing the Shutdown action. + recordsDeliveryQueue.poll(); + shutdownEvent.getSubscriptionShutdownAction().run(); } }