diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 708f6e59..31d751c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -78,6 +78,9 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery @KinesisClientInternalApi public class PrefetchRecordsPublisher implements RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; + // Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds + private static final Duration AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(60); + private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; @@ -260,6 +263,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @Override public void shutdown() { defaultGetRecordsCacheDaemon.isShutdown = true; + executorService.shutdown(); + try { + if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { + executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { + log.error("Executor service didn't terminate"); + } + } + } catch (InterruptedException e) { + // Preserve interrupt status + Thread.currentThread().interrupt(); + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + } executorService.shutdownNow(); started = false; } @@ -409,12 +427,15 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { break; } - resetLock.readLock().lock(); try { + resetLock.readLock().lock(); makeRetrievalAttempt(); } catch(PositionResetException pre) { log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId); } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + " Please search for the exception/error online to check what is going on. If the " + "issue persists or is a recurring problem, feel free to open an issue on, " + @@ -456,6 +477,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } catch (RetryableRetrievalException rre) { log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId); } catch (ExpiredIteratorException e) { log.info("{} : records threw ExpiredIteratorException - restarting" @@ -482,6 +504,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { try { publisherSession.prefetchCounters().waitForConsumer(); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); log.info("{} : Thread was interrupted while waiting for the consumer. " + "Shutdown has probably been started", streamAndShardId); }