fix(worker): fail on TrimmedDataAccessException (101797)

This commit is contained in:
bencvdb 2021-07-01 09:57:34 -07:00
parent 6b287424d9
commit c8a1797b7e

View file

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -526,6 +527,8 @@ public class Worker implements Runnable {
} catch (AmazonDynamoDBException e) { } catch (AmazonDynamoDBException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
handleTrimmedDataAccessException(e);
if (causedByStreamRecordProcessingError(e)) if (causedByStreamRecordProcessingError(e))
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());
if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
@ -552,6 +555,18 @@ public class Worker implements Runnable {
return causedByStreamRecordProcessingError(t.getCause()); return causedByStreamRecordProcessingError(t.getCause());
} }
private void handleTrimmedDataAccessException(Exception e) {
Optional<TrimmedDataAccessException> maybeTrimmedException = getTrimmedDataAccessException(e);
if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get();
}
private Optional<TrimmedDataAccessException> getTrimmedDataAccessException(Throwable t) {
if (t.getCause() == null) return Optional.empty();
if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause());
return getTrimmedDataAccessException(t.getCause());
}
private void initialize() { private void initialize() {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false; boolean isDone = false;