Merge pull request #6 from fivetran/85045-worker-improvements

feature(worker): missing data causes failure
This commit is contained in:
bencvdb 2021-07-07 08:36:14 -07:00 committed by GitHub
commit 47f68875ab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.TrimmedDataAccessException;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -526,6 +527,8 @@ public class Worker implements Runnable {
} catch (AmazonDynamoDBException e) { } catch (AmazonDynamoDBException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
handleTrimmedDataAccessException(e);
if (causedByStreamRecordProcessingError(e)) if (causedByStreamRecordProcessingError(e))
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());
if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
@ -552,6 +555,18 @@ public class Worker implements Runnable {
return causedByStreamRecordProcessingError(t.getCause()); return causedByStreamRecordProcessingError(t.getCause());
} }
private void handleTrimmedDataAccessException(Exception e) {
Optional<TrimmedDataAccessException> maybeTrimmedException = getTrimmedDataAccessException(e);
if (maybeTrimmedException.isPresent()) throw maybeTrimmedException.get();
}
private Optional<TrimmedDataAccessException> getTrimmedDataAccessException(Throwable t) {
if (t.getCause() == null) return Optional.empty();
if (t.getCause().getClass().equals(TrimmedDataAccessException.class)) return Optional.of( (TrimmedDataAccessException) t.getCause());
return getTrimmedDataAccessException(t.getCause());
}
private void initialize() { private void initialize() {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false; boolean isDone = false;