diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 150841c9..1bd420c3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -106,6 +107,8 @@ public class Worker implements Runnable { private WorkerLog wlog = new WorkerLog(); + private List workerProcessLoopExceptionListeners = new ArrayList<>(); + private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; private final KinesisClientLibConfiguration config; @@ -708,6 +711,12 @@ public class Worker implements Runnable { } catch (Exception e) { LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", String.valueOf(idleTimeInMilliseconds)), e); + + // notify the listeners that an error has occurred and pass down the exception for the client to handle + for (WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener : workerProcessLoopExceptionListeners) { + workerProcessLoopExceptionListener.exceptionOccured(e); + } + try { Thread.sleep(idleTimeInMilliseconds); } catch (InterruptedException ex) { @@ -717,6 +726,14 @@ public class Worker implements Runnable { wlog.resetInfoLogging(); } + public void addWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) { + workerProcessLoopExceptionListeners.add(workerProcessLoopExceptionListener); + } + + public void removeWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) { + workerProcessLoopExceptionListeners.remove(workerProcessLoopExceptionListener); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerProcessLoopExceptionListener.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerProcessLoopExceptionListener.java new file mode 100644 index 00000000..7444b45e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerProcessLoopExceptionListener.java @@ -0,0 +1,15 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * A listener for when the worker processing loop encounters an unexpected exception + * so the client caller can inspect the error that occurred and take action. + */ +public interface WorkerProcessLoopExceptionListener { + + /** + * Listeners implement this method and can read the exception to take the appropriate action. + * + * @param e the exception that occurred + */ + void exceptionOccured(Exception e); +}