Fixing issue with prefetch thread, where it kept on dying and falling behind. Catching throwable instead of error. Assigning thread name to the prefetch thread.

This commit is contained in:
Sahil Palvia 2017-10-03 15:29:38 -07:00
parent 585fa63508
commit ffdfe82b79
2 changed files with 13 additions and 3 deletions

View file

@ -110,9 +110,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
} }
private class DefaultGetRecordsCacheDaemon implements Runnable { private class DefaultGetRecordsCacheDaemon implements Runnable {
private volatile boolean isShutdown = false;
@Override @Override
public void run() { public void run() {
while (true) { while (!isShutdown) {
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
log.warn("Prefetch thread was interrupted."); log.warn("Prefetch thread was interrupted.");
callShutdownOnStrategy(); callShutdownOnStrategy();
@ -132,7 +134,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache."); log.info("Thread was interrupted, indicating shutdown was called on the cache.");
callShutdownOnStrategy(); callShutdownOnStrategy();
} catch (Error e) { } catch (Throwable e) {
log.error("Error was thrown while getting records, please check for the error", e); log.error("Error was thrown while getting records, please check for the error", e);
} }
} }
@ -143,6 +145,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
if (!getRecordsRetrievalStrategy.isShutdown()) { if (!getRecordsRetrievalStrategy.isShutdown()) {
getRecordsRetrievalStrategy.shutdown(); getRecordsRetrievalStrategy.shutdown();
} }
isShutdown = true;
} }
private void sleepBeforeNextCall() throws InterruptedException { private void sleepBeforeNextCall() throws InterruptedException {

View file

@ -16,6 +16,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.apachecommons.CommonsLog; import lombok.extern.apachecommons.CommonsLog;
@CommonsLog @CommonsLog
@ -37,7 +39,12 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls);
} else { } else {
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), idleMillisBetweenCalls); getRecordsRetrievalStrategy,
Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("prefetch-get-records-cache-%d")
.build()),
idleMillisBetweenCalls);
} }
} }