This commit is contained in:
philltomlinson 2025-04-05 00:26:07 +02:00 committed by GitHub
commit 1f142ed84c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 32 additions and 0 deletions

View file

@ -19,6 +19,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -106,6 +107,8 @@ public class Worker implements Runnable {
private WorkerLog wlog = new WorkerLog();
private List<WorkerProcessLoopExceptionListener> workerProcessLoopExceptionListeners = new ArrayList<>();
private final String applicationName;
private final IRecordProcessorFactory recordProcessorFactory;
private final KinesisClientLibConfiguration config;
@ -708,6 +711,12 @@ public class Worker implements Runnable {
} catch (Exception e) {
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
String.valueOf(idleTimeInMilliseconds)), e);
// notify the listeners that an error has occurred and pass down the exception for the client to handle
for (WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener : workerProcessLoopExceptionListeners) {
workerProcessLoopExceptionListener.exceptionOccured(e);
}
try {
Thread.sleep(idleTimeInMilliseconds);
} catch (InterruptedException ex) {
@ -717,6 +726,14 @@ public class Worker implements Runnable {
wlog.resetInfoLogging();
}
public void addWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) {
workerProcessLoopExceptionListeners.add(workerProcessLoopExceptionListener);
}
public void removeWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) {
workerProcessLoopExceptionListeners.remove(workerProcessLoopExceptionListener);
}
private void initialize() {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;

View file

@ -0,0 +1,15 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* A listener for when the worker processing loop encounters an unexpected exception
* so the client caller can inspect the error that occurred and take action.
*/
public interface WorkerProcessLoopExceptionListener {
/**
* Listeners implement this method and can read the exception to take the appropriate action.
*
* @param e the exception that occurred
*/
void exceptionOccured(Exception e);
}