Preventing duplicate delivery due to unacknowledged event while completing the subscription (#596)

* Preventing duplicate delivery due to unacknowledged event while completing the subscription

* Refactored clearRecordsDeliveryQueue logic and added comments

* Code refactoring as per review comments

* Nit fix

* Add logging to unexpected subscription state scenario
This commit is contained in:
ashwing 2019-08-19 14:35:06 -07:00 committed by Micah Jaffe
parent 3f6afc6563
commit a17d14527a

View file

@ -64,6 +64,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
ThrowableType.ACQUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10;
private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000;
private final KinesisAsyncClient kinesis;
private final String shardId;
@ -82,6 +83,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private long availableQueueSpace = 0;
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE);
// Flag to indicate if the active subscription is being torn down.
private boolean pendingActiveSubscriptionShutdown = false;
@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
@ -132,6 +135,13 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck);
} catch (Throwable t) {
errorOccurred(triggeringFlow, t);
} finally {
// Notify all the actors who are waiting for the records ack event.
// Here, when the active subscription is being torn down, the completing thread will
// wait for the last delivered records to send back the ack, to prevent sending duplicate records.
if(pendingActiveSubscriptionShutdown) {
lockObject.notifyAll();
}
}
if (triggeringFlow != null) {
updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow);
@ -160,8 +170,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue, if available.
if (recordsDeliveryQueue.peek() != null) {
subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved());
if (!recordsDeliveryQueue.isEmpty()) {
scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved());
}
} else {
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a
@ -194,7 +204,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
recordsDeliveryQueue.add(recordsRetrievedContext);
// If the current batch is the only element in the queue, then try scheduling the event delivery.
if (recordsDeliveryQueue.size() == 1) {
subscriber.onNext(recordsRetrieved);
scheduleNextEvent(recordsRetrieved);
}
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ",
@ -206,6 +216,14 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
// Schedule the next event only when the active subscription is not pending shutdown.
private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) {
if (!pendingActiveSubscriptionShutdown) {
subscriber.onNext(recordsRetrieved);
}
}
@Data
private static final class RecordsRetrievedContext {
private final RecordsRetrieved recordsRetrieved;
@ -223,8 +241,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private void subscribeToShard(String sequenceNumber) {
synchronized (lockObject) {
// Clear the queue so that any stale entries from previous subscription are discarded.
clearRecordsDeliveryQueue();
// Clear the delivery queue so that any stale entries from previous subscription are discarded.
resetRecordsDeliveryStateOnSubscriptionOnInit();
SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder()
.shardId(shardId).consumerARN(consumerArn);
SubscribeToShardRequest request;
@ -263,7 +281,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
// Clear the delivery buffer so that next subscription don't yield duplicate records.
clearRecordsDeliveryQueue();
resetRecordsDeliveryStateOnSubscriptionShutdown();
Throwable propagationThrowable = t;
ThrowableCategory category = throwableCategory(propagationThrowable);
@ -313,8 +331,39 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
private void clearRecordsDeliveryQueue() {
recordsDeliveryQueue.clear();
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject
private void resetRecordsDeliveryStateOnSubscriptionOnInit() {
// Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
+ "previous subscription - {} ", shardId, subscribeToShardId);
recordsDeliveryQueue.clear();
}
if(pendingActiveSubscriptionShutdown) {
log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of"
+ "previous subscription - {} ", shardId, subscribeToShardId);
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject
private void resetRecordsDeliveryStateOnSubscriptionShutdown() {
// Wait for final event notification during the end of the subscription.
if (!recordsDeliveryQueue.isEmpty()) {
// This will prevent further events from getting scheduled, during the wait period.
pendingActiveSubscriptionShutdown = true;
try {
// Wait for the configured time to get a notification for already delivered event, if any.
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Clear the queue to remove any remaining entries from the queue.
recordsDeliveryQueue.clear();
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}
protected void logAcquireTimeoutMessage(Throwable t) {
@ -445,6 +494,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
synchronized (lockObject) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
resetRecordsDeliveryStateOnSubscriptionShutdown();
triggeringFlow.cancel();
if (!hasValidSubscriber()) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,