diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 708f6e59..c1f6bc80 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -78,6 +78,9 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery @KinesisClientInternalApi public class PrefetchRecordsPublisher implements RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; + // Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds + private static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS = 60_000L; + private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; @@ -93,6 +96,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final String operation; private final StreamIdentifier streamId; private final String streamAndShardId; + private final long awaitTerminationTimeoutMillis; private Subscriber subscriber; @VisibleForTesting @Getter private final PublisherSession publisherSession; @@ -201,6 +205,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call * @param executorService Executor service for the cache * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + * @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService */ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @@ -209,7 +214,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { final long idleMillisBetweenCalls, @NonNull final MetricsFactory metricsFactory, @NonNull final String operation, - @NonNull final String shardId) { + @NonNull final String shardId, + final long awaitTerminationTimeoutMillis) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; @@ -225,6 +231,36 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.operation = operation; this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier(); this.streamAndShardId = this.streamId.serialize() + ":" + shardId; + this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis; + } + + /** + * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see PrefetchRecordsPublisher + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ + public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + final ExecutorService executorService, + final long idleMillisBetweenCalls, + final MetricsFactory metricsFactory, + final String operation, + final String shardId) { + this(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecordsPerCall, + getRecordsRetrievalStrategy, executorService, idleMillisBetweenCalls, + metricsFactory, operation, shardId, + DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS); } @Override @@ -260,7 +296,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @Override public void shutdown() { defaultGetRecordsCacheDaemon.isShutdown = true; - executorService.shutdownNow(); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { + executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { + log.error("Executor service didn't terminate"); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } started = false; } @@ -409,12 +459,15 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { break; } - resetLock.readLock().lock(); try { + resetLock.readLock().lock(); makeRetrievalAttempt(); } catch(PositionResetException pre) { log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId); } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + " Please search for the exception/error online to check what is going on. If the " + "issue persists or is a recurring problem, feel free to open an issue on, " + @@ -456,6 +509,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } catch (RetryableRetrievalException rre) { log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId); } catch (ExpiredIteratorException e) { log.info("{} : records threw ExpiredIteratorException - restarting" @@ -482,6 +536,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { try { publisherSession.prefetchCounters().waitForConsumer(); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); log.info("{} : Thread was interrupted while waiting for the consumer. " + "Shutdown has probably been started", streamAndShardId); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index f57a7a3a..5d757a6c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -81,6 +81,7 @@ public class PrefetchRecordsPublisherIntegrationTest { private static final int MAX_RECORDS_COUNT = 30_000; private static final int MAX_RECORDS_PER_CALL = 10_000; private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; + private static final long AWAIT_TERMINATION_TIMEOUT = 1L; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private PrefetchRecordsPublisher getRecordsCache; @@ -121,7 +122,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard"); + "test-shard", + AWAIT_TERMINATION_TIMEOUT); } @Test @@ -174,7 +176,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard-2"); + "test-shard-2", + AWAIT_TERMINATION_TIMEOUT); getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); @@ -254,7 +257,7 @@ public class PrefetchRecordsPublisherIntegrationTest { public void shutdown() { getRecordsCache.shutdown(); sleep(100L); - verify(executorService).shutdownNow(); + verify(executorService).shutdown(); // verify(getRecordsRetrievalStrategy).shutdown(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f2500867..8264f89d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -108,6 +108,7 @@ public class PrefetchRecordsPublisherTest { private static final int MAX_SIZE = 5; private static final int MAX_RECORDS_COUNT = 15000; private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L; + private static final long AWAIT_TERMINATION_TIMEOUT = 1L; private static final String NEXT_SHARD_ITERATOR = "testNextShardIterator"; @Mock @@ -143,7 +144,8 @@ public class PrefetchRecordsPublisherTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); records = spy(new ArrayList<>()); getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build(); @@ -224,7 +226,8 @@ public class PrefetchRecordsPublisherTest { 1000, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); // Setup the retrieval strategy to fail initial calls before succeeding when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new @@ -258,7 +261,8 @@ public class PrefetchRecordsPublisherTest { 1000, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); // Setup the retrieval strategy to fail initial calls before succeeding when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new @@ -770,7 +774,7 @@ public class PrefetchRecordsPublisherTest { @After public void shutdown() { getRecordsCache.shutdown(); - verify(executorService).shutdownNow(); + verify(executorService).shutdown(); } private void sleep(long millis) {