Don't allow activities if the subscriber is current, or the connection to Kinesis is broken (#374)

* Added missing lock around the call to request.

Calls to Subscription#request weren't synchronized correctly.  This
was only really an issue if there is a large number of errors
occurring.

* Reject operations where the subscriber doesn't match.

If the original subscriber doesn't match the current subscriber reject
operations completely.
If the flow is null, but the subscriber still matches error out the
subscription.  The original subscriber will restart.

For canceling only accept the cancel request if the original
subscriber matches the current subscriber.

* Remove unneeded if statement

Don't really need to check if the subscriber is still current, as this
is synchronized.
This commit is contained in:
Justin Pfifer 2018-08-20 13:45:48 -07:00 committed by Sahil Palvia
parent c1e38f0126
commit 5533d370cd

View file

@ -305,18 +305,44 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
subscriber.onSubscribe(new Subscription() { subscriber.onSubscribe(new Subscription() {
@Override @Override
public void request(long n) { public void request(long n) {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.",
shardId, n);
return;
}
if (flow == null) {
//
// Flow has been terminated, so we can't make any requests on it anymore.
//
log.debug(
"{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.",
shardId);
errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow."));
return;
}
long previous = outstandingRequests; long previous = outstandingRequests;
outstandingRequests += n; outstandingRequests += n;
if (previous <= 0) { if (previous <= 0) {
flow.request(1); flow.request(1);
} }
} }
}
@Override @Override
public void cancel() { public void cancel() {
synchronized (lockObject) { synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.",
shardId);
return;
}
if (!hasValidSubscriber()) { if (!hasValidSubscriber()) {
log.warn("{}: Cancelled called even with an invalid subscriber", shardId); log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber",
shardId);
} }
subscriber = null; subscriber = null;
if (flow != null) { if (flow != null) {