diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 5c61132a..fde407b6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -115,6 +115,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { while (true) { if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); + callShutdownOnStrategy(); break; } if (prefetchCounters.shouldGetNewRecords()) { @@ -129,8 +130,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating shutdown was called on the cache. Calling shutdown on the GetRecordsRetrievalStrategy."); - getRecordsRetrievalStrategy.shutdown(); + log.info("Thread was interrupted, indicating shutdown was called on the cache."); + callShutdownOnStrategy(); } catch (Error e) { log.error("Error was thrown while getting records, please check for the error", e); } @@ -138,6 +139,12 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } } + private void callShutdownOnStrategy() { + if (!getRecordsRetrievalStrategy.isShutdown()) { + getRecordsRetrievalStrategy.shutdown(); + } + } + private void sleepBeforeNextCall() throws InterruptedException { if (lastSuccessfulCall == null) { return;