From 5533d370cdf42a6615799a5a1c65a2a7902b4e8e Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 20 Aug 2018 13:45:48 -0700 Subject: [PATCH] Don't allow activities if the subscriber is current, or the connection to Kinesis is broken (#374) * Added missing lock around the call to request. Calls to Subscription#request weren't synchronized correctly. This was only really an issue if there is a large number of errors occurring. * Reject operations where the subscriber doesn't match. If the original subscriber doesn't match the current subscriber reject operations completely. If the flow is null, but the subscriber still matches error out the subscription. The original subscriber will restart. For canceling only accept the cancel request if the original subscriber matches the current subscriber. * Remove unneeded if statement Don't really need to check if the subscriber is still current, as this is synchronized. --- .../fanout/FanOutRecordsPublisher.java | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 8d9ede58..6821f749 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -305,18 +305,44 @@ public class FanOutRecordsPublisher implements RecordsPublisher { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { - long previous = outstandingRequests; - outstandingRequests += n; - if (previous <= 0) { - flow.request(1); + synchronized (lockObject) { + if (subscriber != s) { + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.", + shardId, n); + return; + } + if (flow == null) { + // + // Flow has been terminated, so we can't make any requests on it anymore. + // + log.debug( + "{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.", + shardId); + errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); + return; + } + long previous = outstandingRequests; + outstandingRequests += n; + if (previous <= 0) { + flow.request(1); + } } } @Override public void cancel() { synchronized (lockObject) { + if (subscriber != s) { + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.", + shardId); + return; + } if (!hasValidSubscriber()) { - log.warn("{}: Cancelled called even with an invalid subscriber", shardId); + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber", + shardId); } subscriber = null; if (flow != null) {