Add awaitTerminationTimeoutMillis as paramter for PrefetchRecordsPublisher

Since clients can configure there own awaitTerminationTimeoutMillis,
add it as sepearate parameter with default value
This commit is contained in:
Monishkumar Gajendran 2021-10-06 16:31:07 -07:00
parent b72b9b01ca
commit 9445847447
3 changed files with 49 additions and 9 deletions

View file

@ -79,7 +79,7 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
public class PrefetchRecordsPublisher implements RecordsPublisher { public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
// Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds // Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds
private static final Duration AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(60); private static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS = 60_000L;
private int maxPendingProcessRecordsInput; private int maxPendingProcessRecordsInput;
private int maxByteSize; private int maxByteSize;
@ -96,6 +96,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final String operation; private final String operation;
private final StreamIdentifier streamId; private final StreamIdentifier streamId;
private final String streamAndShardId; private final String streamAndShardId;
private final long awaitTerminationTimeoutMillis;
private Subscriber<? super RecordsRetrieved> subscriber; private Subscriber<? super RecordsRetrieved> subscriber;
@VisibleForTesting @Getter @VisibleForTesting @Getter
private final PublisherSession publisherSession; private final PublisherSession publisherSession;
@ -204,6 +205,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
* @param executorService Executor service for the cache * @param executorService Executor service for the cache
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
* @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService
*/ */
public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall, final int maxRecordsPerCall,
@ -212,7 +214,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
final long idleMillisBetweenCalls, final long idleMillisBetweenCalls,
@NonNull final MetricsFactory metricsFactory, @NonNull final MetricsFactory metricsFactory,
@NonNull final String operation, @NonNull final String operation,
@NonNull final String shardId) { @NonNull final String shardId,
final long awaitTerminationTimeoutMillis) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall; this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
@ -228,6 +231,36 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
this.operation = operation; this.operation = operation;
this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier(); this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier();
this.streamAndShardId = this.streamId.serialize() + ":" + shardId; this.streamAndShardId = this.streamId.serialize() + ":" + shardId;
this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis;
}
/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
* LinkedBlockingQueue.
*
* @see PrefetchRecordsPublisher
*
* @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
* blocking
* @param maxByteSize Max byte size of the queue before blocking next get records call
* @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
* @param maxRecordsPerCall Max records to be returned per call
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
* @param executorService Executor service for the cache
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
*/
public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
final ExecutorService executorService,
final long idleMillisBetweenCalls,
final MetricsFactory metricsFactory,
final String operation,
final String shardId) {
this(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecordsPerCall,
getRecordsRetrievalStrategy, executorService, idleMillisBetweenCalls,
metricsFactory, operation, shardId,
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS);
} }
@Override @Override
@ -265,10 +298,10 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
defaultGetRecordsCacheDaemon.isShutdown = true; defaultGetRecordsCacheDaemon.isShutdown = true;
executorService.shutdown(); executorService.shutdown();
try { try {
if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow(); executorService.shutdownNow();
// Wait a while for tasks to respond to being cancelled // Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) {
log.error("Executor service didn't terminate"); log.error("Executor service didn't terminate");
} }
} }

View file

@ -81,6 +81,7 @@ public class PrefetchRecordsPublisherIntegrationTest {
private static final int MAX_RECORDS_COUNT = 30_000; private static final int MAX_RECORDS_COUNT = 30_000;
private static final int MAX_RECORDS_PER_CALL = 10_000; private static final int MAX_RECORDS_PER_CALL = 10_000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;
private static final long AWAIT_TERMINATION_TIMEOUT = 1L;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private PrefetchRecordsPublisher getRecordsCache; private PrefetchRecordsPublisher getRecordsCache;
@ -121,7 +122,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
IDLE_MILLIS_BETWEEN_CALLS, IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"test-shard"); "test-shard",
AWAIT_TERMINATION_TIMEOUT);
} }
@Test @Test
@ -174,7 +176,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
IDLE_MILLIS_BETWEEN_CALLS, IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"test-shard-2"); "test-shard-2",
AWAIT_TERMINATION_TIMEOUT);
getRecordsCache.start(extendedSequenceNumber, initialPosition); getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS); sleep(IDLE_MILLIS_BETWEEN_CALLS);

View file

@ -108,6 +108,7 @@ public class PrefetchRecordsPublisherTest {
private static final int MAX_SIZE = 5; private static final int MAX_SIZE = 5;
private static final int MAX_RECORDS_COUNT = 15000; private static final int MAX_RECORDS_COUNT = 15000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L; private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L;
private static final long AWAIT_TERMINATION_TIMEOUT = 1L;
private static final String NEXT_SHARD_ITERATOR = "testNextShardIterator"; private static final String NEXT_SHARD_ITERATOR = "testNextShardIterator";
@Mock @Mock
@ -143,7 +144,8 @@ public class PrefetchRecordsPublisherTest {
IDLE_MILLIS_BETWEEN_CALLS, IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"shardId"); "shardId",
AWAIT_TERMINATION_TIMEOUT);
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
records = spy(new ArrayList<>()); records = spy(new ArrayList<>());
getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build(); getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build();
@ -224,7 +226,8 @@ public class PrefetchRecordsPublisherTest {
1000, 1000,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"shardId"); "shardId",
AWAIT_TERMINATION_TIMEOUT);
// Setup the retrieval strategy to fail initial calls before succeeding // Setup the retrieval strategy to fail initial calls before succeeding
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new
RetryableRetrievalException("Timed out")).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new
@ -258,7 +261,8 @@ public class PrefetchRecordsPublisherTest {
1000, 1000,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"shardId"); "shardId",
AWAIT_TERMINATION_TIMEOUT);
// Setup the retrieval strategy to fail initial calls before succeeding // Setup the retrieval strategy to fail initial calls before succeeding
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new
RetryableRetrievalException("Timed out")).thenThrow(new RetryableRetrievalException("Timed out")).thenThrow(new