refactor(Worker): allow worker to fail after error in processing records
This commit is contained in:
parent
913f2e9377
commit
f0230c9040
1 changed files with 5 additions and 0 deletions
|
|
@ -86,6 +86,9 @@ public class Worker implements Runnable {
|
||||||
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
||||||
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
|
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
|
||||||
|
|
||||||
|
// WARNING: dynamo connector depends upon this exact string
|
||||||
|
private static final String PROCESS_RECORDS_ERROR_MESSAGE = "Failure to process records";
|
||||||
|
|
||||||
private WorkerLog wlog = new WorkerLog();
|
private WorkerLog wlog = new WorkerLog();
|
||||||
|
|
||||||
private final String applicationName;
|
private final String applicationName;
|
||||||
|
|
@ -528,6 +531,8 @@ public class Worker implements Runnable {
|
||||||
wlog.info("Sleeping ...");
|
wlog.info("Sleeping ...");
|
||||||
Thread.sleep(idleTimeInMilliseconds);
|
Thread.sleep(idleTimeInMilliseconds);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
if (e.getCause().getMessage().contains(PROCESS_RECORDS_ERROR_MESSAGE))
|
||||||
|
throw new RuntimeException("Failing worker after irrecoverable failure in processing records");
|
||||||
if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
|
if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
|
||||||
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
|
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue