From c8a1797b7eed474cb7fe91f9622a0a04b60841ca Mon Sep 17 00:00:00 2001 From: bencvdb Date: Thu, 1 Jul 2021 09:57:34 -0700 Subject: [PATCH] fix(worker): fail on TrimmedDataAccessException (101797) --- .../kinesis/clientlibrary/lib/worker/Worker.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index ee9a2f21..f4e2d0fc 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -526,6 +527,8 @@ public class Worker implements Runnable { } catch (AmazonDynamoDBException e) { throw e; } catch (Exception e) { + handleTrimmedDataAccessException(e); + if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) @@ -552,6 +555,18 @@ public class Worker implements Runnable { return causedByStreamRecordProcessingError(t.getCause()); } + private void handleTrimmedDataAccessException(Exception e) { + Optional maybeTrimmedException = getTrimmedDataAccessException(e); + if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get(); + } + + private Optional getTrimmedDataAccessException(Throwable t) { + if (t.getCause() == null) return Optional.empty(); + if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause()); + + return getTrimmedDataAccessException(t.getCause()); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false;