diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 98829629..233d9fd5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -79,7 +79,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery public class PrefetchRecordsPublisher implements RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; // Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds - private static final Duration AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(60); + private static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS = 60_000L; private int maxPendingProcessRecordsInput; private int maxByteSize; @@ -96,6 +96,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final String operation; private final StreamIdentifier streamId; private final String streamAndShardId; + private final long awaitTerminationTimeoutMillis; private Subscriber subscriber; @VisibleForTesting @Getter private final PublisherSession publisherSession; @@ -204,6 +205,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call * @param executorService Executor service for the cache * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + * @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService */ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @@ -212,7 +214,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { final long idleMillisBetweenCalls, @NonNull final MetricsFactory metricsFactory, @NonNull final String operation, - @NonNull final String shardId) { + @NonNull final String shardId, + final long awaitTerminationTimeoutMillis) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; @@ -228,6 +231,36 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.operation = operation; this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier(); this.streamAndShardId = this.streamId.serialize() + ":" + shardId; + this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis; + } + + /** + * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see PrefetchRecordsPublisher + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ + public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + final ExecutorService executorService, + final long idleMillisBetweenCalls, + final MetricsFactory metricsFactory, + final String operation, + final String shardId) { + this(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecordsPerCall, + getRecordsRetrievalStrategy, executorService, idleMillisBetweenCalls, + metricsFactory, operation, shardId, + DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS); } @Override @@ -265,10 +298,10 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { defaultGetRecordsCacheDaemon.isShutdown = true; executorService.shutdown(); try { - if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { + if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); // Wait a while for tasks to respond to being cancelled - if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { + if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { log.error("Executor service didn't terminate"); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index 3e8d865a..5d757a6c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -81,6 +81,7 @@ public class PrefetchRecordsPublisherIntegrationTest { private static final int MAX_RECORDS_COUNT = 30_000; private static final int MAX_RECORDS_PER_CALL = 10_000; private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; + private static final long AWAIT_TERMINATION_TIMEOUT = 1L; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private PrefetchRecordsPublisher getRecordsCache; @@ -121,7 +122,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard"); + "test-shard", + AWAIT_TERMINATION_TIMEOUT); } @Test @@ -174,7 +176,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard-2"); + "test-shard-2", + AWAIT_TERMINATION_TIMEOUT); getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 11c16f3f..8264f89d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -108,6 +108,7 @@ public class PrefetchRecordsPublisherTest { private static final int MAX_SIZE = 5; private static final int MAX_RECORDS_COUNT = 15000; private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L; + private static final long AWAIT_TERMINATION_TIMEOUT = 1L; private static final String NEXT_SHARD_ITERATOR = "testNextShardIterator"; @Mock @@ -143,7 +144,8 @@ public class PrefetchRecordsPublisherTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); records = spy(new ArrayList<>()); getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build(); @@ -224,7 +226,8 @@ public class PrefetchRecordsPublisherTest { 1000, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); // Setup the retrieval strategy to fail initial calls before succeeding when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new @@ -258,7 +261,8 @@ public class PrefetchRecordsPublisherTest { 1000, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + AWAIT_TERMINATION_TIMEOUT); // Setup the retrieval strategy to fail initial calls before succeeding when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new