From b105c40f91d33d0f235a7bddc1be9b6d629e16bc Mon Sep 17 00:00:00 2001 From: Phill Tomlinson Date: Thu, 7 Jan 2021 12:23:06 +0000 Subject: [PATCH] #328 Add support to pass worker process loop exceptions back to the client to enable error handling if required. --- .../clientlibrary/lib/worker/Worker.java | 17 +++++++++++++++++ .../WorkerProcessLoopExceptionListener.java | 15 +++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerProcessLoopExceptionListener.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f8c66181..d2a4c444 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -106,6 +107,8 @@ public class Worker implements Runnable { private WorkerLog wlog = new WorkerLog(); + private List workerProcessLoopExceptionListeners = new ArrayList<>(); + private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; private final KinesisClientLibConfiguration config; @@ -704,6 +707,12 @@ public class Worker implements Runnable { } catch (Exception e) { LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", String.valueOf(idleTimeInMilliseconds)), e); + + // notify the listeners that an error has occurred and pass down the exception for the client to handle + for (WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener : workerProcessLoopExceptionListeners) { + workerProcessLoopExceptionListener.exceptionOccured(e); + } + try { Thread.sleep(idleTimeInMilliseconds); } catch (InterruptedException ex) { @@ -713,6 +722,14 @@ public class Worker implements Runnable { wlog.resetInfoLogging(); } + public void addWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) { + workerProcessLoopExceptionListeners.add(workerProcessLoopExceptionListener); + } + + public void removeWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) { + workerProcessLoopExceptionListeners.remove(workerProcessLoopExceptionListener); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerProcessLoopExceptionListener.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerProcessLoopExceptionListener.java new file mode 100644 index 00000000..7444b45e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerProcessLoopExceptionListener.java @@ -0,0 +1,15 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * A listener for when the worker processing loop encounters an unexpected exception + * so the client caller can inspect the error that occurred and take action. + */ +public interface WorkerProcessLoopExceptionListener { + + /** + * Listeners implement this method and can read the exception to take the appropriate action. + * + * @param e the exception that occurred + */ + void exceptionOccured(Exception e); +}