diff --git a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index ee9a2f21..f4e2d0fc 100644 --- a/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/fivetran/external/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -526,6 +527,8 @@ public class Worker implements Runnable { } catch (AmazonDynamoDBException e) { throw e; } catch (Exception e) { + handleTrimmedDataAccessException(e); + if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) @@ -552,6 +555,18 @@ public class Worker implements Runnable { return causedByStreamRecordProcessingError(t.getCause()); } + private void handleTrimmedDataAccessException(Exception e) { + Optional maybeTrimmedException = getTrimmedDataAccessException(e); + if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get(); + } + + private Optional getTrimmedDataAccessException(Throwable t) { + if (t.getCause() == null) return Optional.empty(); + if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause()); + + return getTrimmedDataAccessException(t.getCause()); + } + private void initialize() { workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false;