Remove the shutdown event from the queue before executing the shudown event
This commit is contained in:
parent
c7754c4eda
commit
c5c4e428f2
1 changed files with 9 additions and 3 deletions
|
|
@ -168,7 +168,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
|
||||
// Try scheduling the next event in the queue or execute the subscription shutdown action.
|
||||
if (!recordsDeliveryQueue.isEmpty()) {
|
||||
recordsDeliveryQueue.peek().executeEventAction(subscriber);
|
||||
recordsDeliveryQueue.peek().executeEventAction(subscriber, recordsDeliveryQueue);
|
||||
}
|
||||
} else {
|
||||
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a
|
||||
|
|
@ -225,9 +225,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
}
|
||||
|
||||
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
|
||||
void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber) {
|
||||
void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber, BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue) {
|
||||
recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent),
|
||||
shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run());
|
||||
shutdownEvent -> handleShutdownEvent(shutdownEvent, recordsDeliveryQueue));
|
||||
}
|
||||
|
||||
private void handleShutdownEvent(SubscriptionShutdownEvent shutdownEvent, BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue) {
|
||||
//Removing the Shutdown Event from the queue before executing the Shutdown action.
|
||||
recordsDeliveryQueue.poll();
|
||||
shutdownEvent.getSubscriptionShutdownAction().run();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue