Fix to shutdown PrefetchRecordsPublisher in gracefull manner
Previously when the lease expires PrefetchRecordsPublisher shutdown the process forecefully by interupting the threads, which lead to leak in apache http client connection Now changed to code to shutdown the PrefetchRecordsPublisher process in more gracefull manager and handled interrupted exception
This commit is contained in:
parent
2447513de3
commit
b5801edc1d
1 changed files with 24 additions and 1 deletions
|
|
@ -78,6 +78,9 @@ import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDelivery
|
|||
@KinesisClientInternalApi
|
||||
public class PrefetchRecordsPublisher implements RecordsPublisher {
|
||||
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
|
||||
// Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds
|
||||
private static final Duration AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(60);
|
||||
|
||||
private int maxPendingProcessRecordsInput;
|
||||
private int maxByteSize;
|
||||
private int maxRecordsCount;
|
||||
|
|
@ -260,6 +263,21 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
@Override
|
||||
public void shutdown() {
|
||||
defaultGetRecordsCacheDaemon.isShutdown = true;
|
||||
executorService.shutdown();
|
||||
try {
|
||||
if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
executorService.shutdownNow();
|
||||
// Wait a while for tasks to respond to being cancelled
|
||||
if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
log.error("Executor service didn't terminate");
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Preserve interrupt status
|
||||
Thread.currentThread().interrupt();
|
||||
// (Re-)Cancel if current thread also interrupted
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
executorService.shutdownNow();
|
||||
started = false;
|
||||
}
|
||||
|
|
@ -409,12 +427,15 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
break;
|
||||
}
|
||||
|
||||
resetLock.readLock().lock();
|
||||
try {
|
||||
resetLock.readLock().lock();
|
||||
makeRetrievalAttempt();
|
||||
} catch(PositionResetException pre) {
|
||||
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
|
||||
" Please search for the exception/error online to check what is going on. If the " +
|
||||
"issue persists or is a recurring problem, feel free to open an issue on, " +
|
||||
|
|
@ -456,6 +477,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
} catch (RetryableRetrievalException rre) {
|
||||
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
|
||||
} catch (ExpiredIteratorException e) {
|
||||
log.info("{} : records threw ExpiredIteratorException - restarting"
|
||||
|
|
@ -482,6 +504,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
try {
|
||||
publisherSession.prefetchCounters().waitForConsumer();
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.info("{} : Thread was interrupted while waiting for the consumer. " +
|
||||
"Shutdown has probably been started", streamAndShardId);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue