Rename outstandingRquests to availableQueueSpace (#375)

outstandingRequests was actually representing the available space in
the RxJava queue.  This renames it to better match reality.

Also changed to only make the request if there is available queue
space.  We now decrement availableQueueSpace ahead of determine
whether to request another item.
This commit is contained in:
Justin Pfifer 2018-08-20 16:17:25 -07:00 committed by Sahil Palvia
parent 5533d370cd
commit 90acdc02bf

View file

@ -63,7 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private boolean isFirstConnection = true;
private Subscriber<? super ProcessRecordsInput> subscriber;
private long outstandingRequests = 0;
private long availableQueueSpace = 0;
@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
@ -129,8 +129,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category, t);
flow.cancel();
}
log.debug("{}: outstandingRequests zeroing from {}", shardId, outstandingRequests);
outstandingRequests = 0;
log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace);
availableQueueSpace = 0;
try {
handleFlowError(t);
@ -225,13 +225,15 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
errorOccurred(triggeringFlow, t);
}
if (outstandingRequests > 0) {
outstandingRequests--;
triggeringFlow.request(1);
} else {
if (availableQueueSpace <= 0) {
log.debug(
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement outstandingRequests to below 0",
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
} else {
availableQueueSpace--;
if (availableQueueSpace > 0) {
triggeringFlow.request(1);
}
}
}
}
@ -322,8 +324,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow."));
return;
}
long previous = outstandingRequests;
outstandingRequests += n;
long previous = availableQueueSpace;
availableQueueSpace += n;
if (previous <= 0) {
flow.request(1);
}
@ -349,7 +351,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
outstandingRequests = 0;
availableQueueSpace = 0;
}
}
}
@ -591,8 +593,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.outstandingRequests);
if (parent.outstandingRequests > 0) {
parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace);
if (parent.availableQueueSpace > 0) {
request(1);
}
}