From 8642f0b40b2afdbda145abee8cd63c7e42f896ca Mon Sep 17 00:00:00 2001 From: glarwood Date: Fri, 22 Mar 2019 17:58:14 +0000 Subject: [PATCH] feat(StreamRecordProcessingError): add new error to stop worker --- .../worker/StreamRecordProcessingError.java | 10 ++++++++++ .../clientlibrary/lib/worker/Worker.java | 19 ++++++++----------- 2 files changed, 18 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamRecordProcessingError.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamRecordProcessingError.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamRecordProcessingError.java new file mode 100644 index 00000000..c55152b6 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamRecordProcessingError.java @@ -0,0 +1,10 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +public class StreamRecordProcessingError extends Error { + + public static final String PROCESS_RECORDS_ERROR_MESSAGE = "Failure to process records for shard id: "; + + public StreamRecordProcessingError(String shardId) { + super(PROCESS_RECORDS_ERROR_MESSAGE + shardId); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 6f51f151..945a7938 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -14,8 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.time.Duration; -import java.time.Instant; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -31,12 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.stream.Collectors; -import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; -import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; -import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.Lease; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,9 +79,6 @@ public class Worker implements Runnable { private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final boolean DEFAULT_EXITS_ON_FAILURE = false; - // WARNING: dynamo connector depends upon this exact string - private static final String PROCESS_RECORDS_ERROR_MESSAGE = "Failure to process records"; - private WorkerLog wlog = new WorkerLog(); private final String applicationName; @@ -531,7 +521,7 @@ public class Worker implements Runnable { wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); } catch (Exception e) { - if (e.getCause().getMessage().contains(PROCESS_RECORDS_ERROR_MESSAGE)) + if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure in processing records"); if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e); @@ -550,6 +540,13 @@ public class Worker implements Runnable { wlog.resetInfoLogging(); } + private boolean causedByStreamRecordProcessingError(Throwable t) { + if (t.getCause() == null) return false; + if (t.getCause().getClass().equals(StreamRecordProcessingError.class)) return true; + + return causedByStreamRecordProcessingError(t.getCause()); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false;