diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index b3d4b6f6..90d3ab07 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -27,7 +28,7 @@ import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; /** - * This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple @@ -45,13 +46,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final ExecutorService executorService; private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; + private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private PrefetchCounters prefetchCounters; private boolean started = false; /** - * Constructor + * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache * * @param maxSize Max size of the queue in the cache * @param maxByteSize Max byte size of the queue before blocking next get records call @@ -75,6 +80,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); } @Override @@ -85,7 +91,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { if (!started) { log.info("Starting prefetching thread."); - executorService.execute(new DefaultGetRecordsCacheDaemon()); + executorService.execute(defaultGetRecordsCacheDaemon); } started = true; } @@ -116,19 +122,19 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { + defaultGetRecordsCacheDaemon.isShutdown = true; executorService.shutdownNow(); started = false; } private class DefaultGetRecordsCacheDaemon implements Runnable { - private volatile boolean isShutdown = false; + volatile boolean isShutdown = false; @Override public void run() { while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); - callShutdownOnStrategy(); break; } if (prefetchCounters.shouldGetNewRecords()) { @@ -144,19 +150,23 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); - callShutdownOnStrategy(); + } catch (SdkClientException e) { + log.error("Exception thrown while fetching records from Kinesis", e); } catch (Throwable e) { - log.error("Error was thrown while getting records, please check for the error", e); + log.error("Unexpected exception was thrown. This could probably be an issue or a bug." + + " Please search for the exception/error online to check what is going on. If the " + + "issue persists or is a recurring problem, feel free to open an issue on, " + + "https://github.com/awslabs/amazon-kinesis-client.", e); } } } + callShutdownOnStrategy(); } private void callShutdownOnStrategy() { if (!getRecordsRetrievalStrategy.isShutdown()) { getRecordsRetrievalStrategy.shutdown(); } - isShutdown = true; } private void sleepBeforeNextCall() throws InterruptedException { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 0a577ada..c9f10492 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -42,7 +42,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("prefetch-get-records-cache-" + shardId + "-%d") + .setNameFormat("prefetch-cache-" + shardId + "-%04d") .build()), idleMillisBetweenCalls); }