diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index bcca1a42..6f51f151 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -86,6 +86,9 @@ public class Worker implements Runnable { private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final boolean DEFAULT_EXITS_ON_FAILURE = false; + // WARNING: dynamo connector depends upon this exact string + private static final String PROCESS_RECORDS_ERROR_MESSAGE = "Failure to process records"; + private WorkerLog wlog = new WorkerLog(); private final String applicationName; @@ -528,6 +531,8 @@ public class Worker implements Runnable { wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); } catch (Exception e) { + if (e.getCause().getMessage().contains(PROCESS_RECORDS_ERROR_MESSAGE)) + throw new RuntimeException("Failing worker after irrecoverable failure in processing records"); if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);