From 6fd59b326ab1eb6f3a6c83a8604de03ee510d5e3 Mon Sep 17 00:00:00 2001 From: madagascar22 Date: Wed, 6 Oct 2021 17:35:09 -0700 Subject: [PATCH] Fix to shutdown PrefetchRecordsPublisher in gracefull manner (#857) * Fix to shutdown PrefetchRecordsPublisher in gracefull manner Previously when the lease expires PrefetchRecordsPublisher shutdown the process forecefully by interupting the threads, which lead to leak in apache http client connection Now changed to code to shutdown the PrefetchRecordsPublisher process in more gracefull manager and handled interrupted exception * Fixed failing unit test * Add awaitTerminationTimeoutMillis as paramter for PrefetchRecordsPublisher Since clients can configure there own awaitTerminationTimeoutMillis, add it as sepearate parameter with default value * Fix setting interrupot status after shutdown Co-authored-by: Monishkumar Gajendran --- .../polling/PrefetchRecordsPublisher.java | 61 ++++++++++++++++++- ...efetchRecordsPublisherIntegrationTest.java | 9 ++- .../polling/PrefetchRecordsPublisherTest.java | 12 ++-- 3 files changed, 72 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 708f6e59..c1f6bc80 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -78,6 +78,9 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery @KinesisClientInternalApi public class PrefetchRecordsPublisher implements RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; + // Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds + private static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS = 60_000L; + private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; @@ -93,6 +96,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final String operation; private final StreamIdentifier streamId; private final String streamAndShardId; + private final long awaitTerminationTimeoutMillis; private Subscriber subscriber; @VisibleForTesting @Getter private final PublisherSession publisherSession; @@ -201,6 +205,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call * @param executorService Executor service for the cache * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + * @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService */ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @@ -209,7 +214,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { final long idleMillisBetweenCalls, @NonNull final MetricsFactory metricsFactory, @NonNull final String operation, - @NonNull final String shardId) { + @NonNull final String shardId, + final long awaitTerminationTimeoutMillis) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; @@ -225,6 +231,36 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.operation = operation; this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier(); this.streamAndShardId = this.streamId.serialize() + ":" + shardId; + this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis; + } + + /** + * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see PrefetchRecordsPublisher + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ + public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + final ExecutorService executorService, + final long idleMillisBetweenCalls, + final MetricsFactory metricsFactory, + final String operation, + final String shardId) { + this(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecordsPerCall, + getRecordsRetrievalStrategy, executorService, idleMillisBetweenCalls, + metricsFactory, operation, shardId, + DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS); } @Override @@ -260,7 +296,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @Override public void shutdown() { defaultGetRecordsCacheDaemon.isShutdown = true; - executorService.shutdownNow(); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { + executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { + log.error("Executor service didn't terminate"); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } started = false; } @@ -409,12 +459,15 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { break; } - resetLock.readLock().lock(); try { + resetLock.readLock().lock(); makeRetrievalAttempt(); } catch(PositionResetException pre) { log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId); } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + " Please search for the exception/error online to check what is going on. If the " + "issue persists or is a recurring problem, feel free to open an issue on, " + @@ -456,6 +509,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } catch (RetryableRetrievalException rre) { log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId); } catch (ExpiredIteratorException e) { log.info("{} : records threw ExpiredIteratorException - restarting" @@ -482,6 +536,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { try { publisherSession.prefetchCounters().waitForConsumer(); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); log.info("{} : Thread was interrupted while waiting for the consumer. " + "Shutdown has probably been started", streamAndShardId); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index f57a7a3a..5d757a6c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -81,6 +81,7 @@ public class PrefetchRecordsPublisherIntegrationTest { private static final int MAX_RECORDS_COUNT = 30_000; private static final int MAX_RECORDS_PER_CALL = 10_000; private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; + private static final long AWAIT_TERMINATION_TIMEOUT = 1L; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private PrefetchRecordsPublisher getRecordsCache; @@ -121,7 +122,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard"); + "test-shard", + AWAIT_TERMINATION_TIMEOUT); } @Test @@ -174,7 +176,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard-2"); + "test-shard-2", + AWAIT_TERMINATION_TIMEOUT); getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); @@ -254,7 +257,7 @@ public class PrefetchRecordsPublisherIntegrationTest { public void shutdown() { getRecordsCache.shutdown(); sleep(100L); - verify(executorService).shutdownNow(); + verify(executorService).shutdown(); // verify(getRecordsRetrievalStrategy).shutdown(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f2500867..8264f89d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -108,6 +108,7 @@ public class PrefetchRecordsPublisherTest { private static final int MAX_SIZE = 5; private static final int MAX_RECORDS_COUNT = 15000; private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L; + private static final long AWAIT_TERMINATION_TIMEOUT = 1L; private static final String NEXT_SHARD_ITERATOR = "testNextShardIterator"; @Mock @@ -143,7 +144,8 @@ public class PrefetchRecordsPublisherTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); records = spy(new ArrayList<>()); getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build(); @@ -224,7 +226,8 @@ public class PrefetchRecordsPublisherTest { 1000, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); // Setup the retrieval strategy to fail initial calls before succeeding when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new @@ -258,7 +261,8 @@ public class PrefetchRecordsPublisherTest { 1000, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); // Setup the retrieval strategy to fail initial calls before succeeding when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new @@ -770,7 +774,7 @@ public class PrefetchRecordsPublisherTest { @After public void shutdown() { getRecordsCache.shutdown(); - verify(executorService).shutdownNow(); + verify(executorService).shutdown(); } private void sleep(long millis) {