From ffdfe82b79e0a2f5901bb868ce3b67adf2f2e311 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 3 Oct 2017 15:29:38 -0700 Subject: [PATCH] Fixing issue with prefetch thread, where it kept on dying and falling behind. Catching throwable instead of error. Assigning thread name to the prefetch thread. --- .../lib/worker/PrefetchGetRecordsCache.java | 7 +++++-- .../lib/worker/SimpleRecordsFetcherFactory.java | 9 ++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index fde407b6..cf76442e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -110,9 +110,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } private class DefaultGetRecordsCacheDaemon implements Runnable { + private volatile boolean isShutdown = false; + @Override public void run() { - while (true) { + while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); callShutdownOnStrategy(); @@ -132,7 +134,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); callShutdownOnStrategy(); - } catch (Error e) { + } catch (Throwable e) { log.error("Error was thrown while getting records, please check for the error", e); } } @@ -143,6 +145,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { if (!getRecordsRetrievalStrategy.isShutdown()) { getRecordsRetrievalStrategy.shutdown(); } + isShutdown = true; } private void sleepBeforeNextCall() throws InterruptedException { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 2b6e4e83..cde0f53f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -16,6 +16,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.Executors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import lombok.extern.apachecommons.CommonsLog; @CommonsLog @@ -37,7 +39,12 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, - getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), idleMillisBetweenCalls); + getRecordsRetrievalStrategy, + Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("prefetch-get-records-cache-%d") + .build()), + idleMillisBetweenCalls); } }