Merge pull request #2 from fivetran/throw_insufficent_permissions_error
feature(worker): halt worker on permissions error
This commit is contained in:
commit
88b6ff2f30
2 changed files with 13 additions and 6 deletions
|
|
@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
@ -319,13 +320,14 @@ class ShardConsumer {
|
||||||
return TaskOutcome.SUCCESSFUL;
|
return TaskOutcome.SUCCESSFUL;
|
||||||
}
|
}
|
||||||
logTaskException(result);
|
logTaskException(result);
|
||||||
|
throw result.getException();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
if (e instanceof AmazonDynamoDBException) throw (AmazonDynamoDBException) e;
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} finally {
|
} finally {
|
||||||
// Setting future to null so we don't misinterpret task completion status in case of exceptions
|
// Setting future to null so we don't misinterpret task completion status in case of exceptions
|
||||||
future = null;
|
future = null;
|
||||||
}
|
}
|
||||||
return TaskOutcome.FAILURE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void logTaskException(TaskResult taskResult) {
|
private void logTaskException(TaskResult taskResult) {
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -488,12 +489,14 @@ public class Worker implements Runnable {
|
||||||
shutdown();
|
shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!shouldShutdown()) {
|
try {
|
||||||
runProcessLoop();
|
while (!shouldShutdown()) {
|
||||||
|
runProcessLoop();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
finalShutdown();
|
||||||
|
LOG.info("Worker loop is complete. Exiting from worker.");
|
||||||
}
|
}
|
||||||
|
|
||||||
finalShutdown();
|
|
||||||
LOG.info("Worker loop is complete. Exiting from worker.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
@ -520,6 +523,8 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
wlog.info("Sleeping ...");
|
wlog.info("Sleeping ...");
|
||||||
Thread.sleep(idleTimeInMilliseconds);
|
Thread.sleep(idleTimeInMilliseconds);
|
||||||
|
} catch (AmazonDynamoDBException e) {
|
||||||
|
throw e;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (causedByStreamRecordProcessingError(e))
|
if (causedByStreamRecordProcessingError(e))
|
||||||
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());
|
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue