Merge pull request #642 from ychunxue/master

Remove the shutdown event from the queue before executing the shudown…
This commit is contained in:
ychunxue 2019-11-01 15:45:32 -07:00 committed by GitHub
commit 047493aa9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -227,7 +227,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber) { void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber) {
recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent), recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent),
shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run()); shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run());
} }
} }
@ -729,7 +729,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeExceptionOccurred(throwable); executeExceptionOccurred(throwable);
} else { } else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeExceptionOccurred(throwable), "onError", throwable); () -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
} }
} }
@ -768,7 +768,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeComplete(); executeComplete();
} else { } else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeComplete(), "onComplete"); () -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
} }
} }