Addressing comments

This commit is contained in:
Chunxue Yang 2019-11-01 14:08:57 -07:00
parent c5c4e428f2
commit 5239ed96ff

View file

@ -168,7 +168,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
flowToBeReturned = recordsRetrievedContext.getRecordFlow(); flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue or execute the subscription shutdown action. // Try scheduling the next event in the queue or execute the subscription shutdown action.
if (!recordsDeliveryQueue.isEmpty()) { if (!recordsDeliveryQueue.isEmpty()) {
recordsDeliveryQueue.peek().executeEventAction(subscriber, recordsDeliveryQueue); recordsDeliveryQueue.peek().executeEventAction(subscriber);
} }
} else { } else {
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a // Check if the mismatched event belongs to active flow. If publisher receives an ack for a
@ -225,15 +225,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} }
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber, BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue) { void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber) {
recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent), recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent),
shutdownEvent -> handleShutdownEvent(shutdownEvent, recordsDeliveryQueue)); shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run());
}
private void handleShutdownEvent(SubscriptionShutdownEvent shutdownEvent, BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue) {
//Removing the Shutdown Event from the queue before executing the Shutdown action.
recordsDeliveryQueue.poll();
shutdownEvent.getSubscriptionShutdownAction().run();
} }
} }
@ -735,7 +729,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeExceptionOccurred(throwable); executeExceptionOccurred(throwable);
} else { } else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeExceptionOccurred(throwable), "onError", throwable); () -> {parent.recordsDeliveryQueue.poll(); executeExceptionOccurred(throwable);}, "onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
} }
} }
@ -774,7 +768,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
executeComplete(); executeComplete();
} else { } else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeComplete(), "onComplete"); () -> {parent.recordsDeliveryQueue.poll(); executeComplete();}, "onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
} }
} }