#328 Add support to pass worker process loop exceptions back to the client to enable error handling if required.
This commit is contained in:
parent
6fbfc21ad7
commit
b105c40f91
2 changed files with 32 additions and 0 deletions
|
|
@ -19,6 +19,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
@ -106,6 +107,8 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
private WorkerLog wlog = new WorkerLog();
|
private WorkerLog wlog = new WorkerLog();
|
||||||
|
|
||||||
|
private List<WorkerProcessLoopExceptionListener> workerProcessLoopExceptionListeners = new ArrayList<>();
|
||||||
|
|
||||||
private final String applicationName;
|
private final String applicationName;
|
||||||
private final IRecordProcessorFactory recordProcessorFactory;
|
private final IRecordProcessorFactory recordProcessorFactory;
|
||||||
private final KinesisClientLibConfiguration config;
|
private final KinesisClientLibConfiguration config;
|
||||||
|
|
@ -704,6 +707,12 @@ public class Worker implements Runnable {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
|
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
|
||||||
String.valueOf(idleTimeInMilliseconds)), e);
|
String.valueOf(idleTimeInMilliseconds)), e);
|
||||||
|
|
||||||
|
// notify the listeners that an error has occurred and pass down the exception for the client to handle
|
||||||
|
for (WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener : workerProcessLoopExceptionListeners) {
|
||||||
|
workerProcessLoopExceptionListener.exceptionOccured(e);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(idleTimeInMilliseconds);
|
Thread.sleep(idleTimeInMilliseconds);
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
|
|
@ -713,6 +722,14 @@ public class Worker implements Runnable {
|
||||||
wlog.resetInfoLogging();
|
wlog.resetInfoLogging();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) {
|
||||||
|
workerProcessLoopExceptionListeners.add(workerProcessLoopExceptionListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeWorkerProcessLoopExceptionObserver(final WorkerProcessLoopExceptionListener workerProcessLoopExceptionListener) {
|
||||||
|
workerProcessLoopExceptionListeners.remove(workerProcessLoopExceptionListener);
|
||||||
|
}
|
||||||
|
|
||||||
private void initialize() {
|
private void initialize() {
|
||||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
||||||
boolean isDone = false;
|
boolean isDone = false;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,15 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A listener for when the worker processing loop encounters an unexpected exception
|
||||||
|
* so the client caller can inspect the error that occurred and take action.
|
||||||
|
*/
|
||||||
|
public interface WorkerProcessLoopExceptionListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listeners implement this method and can read the exception to take the appropriate action.
|
||||||
|
*
|
||||||
|
* @param e the exception that occurred
|
||||||
|
*/
|
||||||
|
void exceptionOccured(Exception e);
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue