From 90acdc02bfe4736b2d57f8d600220d950d13138a Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 20 Aug 2018 16:17:25 -0700 Subject: [PATCH] Rename outstandingRquests to availableQueueSpace (#375) outstandingRequests was actually representing the available space in the RxJava queue. This renames it to better match reality. Also changed to only make the request if there is available queue space. We now decrement availableQueueSpace ahead of determine whether to request another item. --- .../fanout/FanOutRecordsPublisher.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 6821f749..978cd9f9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -63,7 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private boolean isFirstConnection = true; private Subscriber subscriber; - private long outstandingRequests = 0; + private long availableQueueSpace = 0; @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -129,8 +129,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { shardId, flow.connectionStartedAt, flow.subscribeToShardId, category, t); flow.cancel(); } - log.debug("{}: outstandingRequests zeroing from {}", shardId, outstandingRequests); - outstandingRequests = 0; + log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); + availableQueueSpace = 0; try { handleFlowError(t); @@ -225,13 +225,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher { errorOccurred(triggeringFlow, t); } - if (outstandingRequests > 0) { - outstandingRequests--; - triggeringFlow.request(1); - } else { + if (availableQueueSpace <= 0) { log.debug( - "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement outstandingRequests to below 0", + "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + } else { + availableQueueSpace--; + if (availableQueueSpace > 0) { + triggeringFlow.request(1); + } } } } @@ -322,8 +324,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); return; } - long previous = outstandingRequests; - outstandingRequests += n; + long previous = availableQueueSpace; + availableQueueSpace += n; if (previous <= 0) { flow.request(1); } @@ -349,7 +351,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); - outstandingRequests = 0; + availableQueueSpace = 0; } } } @@ -591,8 +593,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.outstandingRequests); - if (parent.outstandingRequests > 0) { + parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); + if (parent.availableQueueSpace > 0) { request(1); } }