Addressing comments from PR. Catching expected exceptions, changing log message for unexcepted exception. Changing threadname.

This commit is contained in:
Sahil Palvia 2017-10-04 11:14:55 -07:00
parent 80216b9a59
commit 5717bab9b6
2 changed files with 19 additions and 9 deletions

View file

@ -20,6 +20,7 @@ import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
@ -27,7 +28,7 @@ import lombok.NonNull;
import lombok.extern.apachecommons.CommonsLog;
/**
* This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
* This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
* next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum
* number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the
* cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple
@ -45,13 +46,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private final ExecutorService executorService;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private PrefetchCounters prefetchCounters;
private boolean started = false;
/**
* Constructor
* Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a
* LinkedBlockingQueue.
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache
*
* @param maxSize Max size of the queue in the cache
* @param maxByteSize Max byte size of the queue before blocking next get records call
@ -75,6 +80,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
this.prefetchCounters = new PrefetchCounters();
this.executorService = executorService;
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
}
@Override
@ -85,7 +91,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
if (!started) {
log.info("Starting prefetching thread.");
executorService.execute(new DefaultGetRecordsCacheDaemon());
executorService.execute(defaultGetRecordsCacheDaemon);
}
started = true;
}
@ -116,19 +122,19 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override
public void shutdown() {
defaultGetRecordsCacheDaemon.isShutdown = true;
executorService.shutdownNow();
started = false;
}
private class DefaultGetRecordsCacheDaemon implements Runnable {
private volatile boolean isShutdown = false;
volatile boolean isShutdown = false;
@Override
public void run() {
while (!isShutdown) {
if (Thread.currentThread().isInterrupted()) {
log.warn("Prefetch thread was interrupted.");
callShutdownOnStrategy();
break;
}
if (prefetchCounters.shouldGetNewRecords()) {
@ -144,19 +150,23 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
prefetchCounters.added(processRecordsInput);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache.");
callShutdownOnStrategy();
} catch (SdkClientException e) {
log.error("Exception thrown while fetching records from Kinesis", e);
} catch (Throwable e) {
log.error("Error was thrown while getting records, please check for the error", e);
log.error("Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", e);
}
}
}
callShutdownOnStrategy();
}
private void callShutdownOnStrategy() {
if (!getRecordsRetrievalStrategy.isShutdown()) {
getRecordsRetrievalStrategy.shutdown();
}
isShutdown = true;
}
private void sleepBeforeNextCall() throws InterruptedException {

View file

@ -42,7 +42,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
getRecordsRetrievalStrategy,
Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("prefetch-get-records-cache-" + shardId + "-%d")
.setNameFormat("prefetch-cache-" + shardId + "-%04d")
.build()),
idleMillisBetweenCalls);
}