diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 8d9ede58..6821f749 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -305,18 +305,44 @@ public class FanOutRecordsPublisher implements RecordsPublisher { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { - long previous = outstandingRequests; - outstandingRequests += n; - if (previous <= 0) { - flow.request(1); + synchronized (lockObject) { + if (subscriber != s) { + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.", + shardId, n); + return; + } + if (flow == null) { + // + // Flow has been terminated, so we can't make any requests on it anymore. + // + log.debug( + "{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.", + shardId); + errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); + return; + } + long previous = outstandingRequests; + outstandingRequests += n; + if (previous <= 0) { + flow.request(1); + } } } @Override public void cancel() { synchronized (lockObject) { + if (subscriber != s) { + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.", + shardId); + return; + } if (!hasValidSubscriber()) { - log.warn("{}: Cancelled called even with an invalid subscriber", shardId); + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber", + shardId); } subscriber = null; if (flow != null) {