feature(worker): throw and handle AmazonDynamDBException

This commit is contained in:
bencvdb 2019-06-25 14:28:02 -07:00
parent 06652d8f88
commit b8861e50b1
2 changed files with 6 additions and 1 deletions

View file

@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -319,13 +320,14 @@ class ShardConsumer {
return TaskOutcome.SUCCESSFUL; return TaskOutcome.SUCCESSFUL;
} }
logTaskException(result); logTaskException(result);
throw result.getException();
} catch (Exception e) { } catch (Exception e) {
if (e instanceof AmazonDynamoDBException) throw (AmazonDynamoDBException) e;
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
// Setting future to null so we don't misinterpret task completion status in case of exceptions // Setting future to null so we don't misinterpret task completion status in case of exceptions
future = null; future = null;
} }
return TaskOutcome.FAILURE;
} }
private void logTaskException(TaskResult taskResult) { private void logTaskException(TaskResult taskResult) {

View file

@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -520,6 +521,8 @@ public class Worker implements Runnable {
wlog.info("Sleeping ..."); wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds); Thread.sleep(idleTimeInMilliseconds);
} catch (AmazonDynamoDBException e) {
throw e;
} catch (Exception e) { } catch (Exception e) {
if (causedByStreamRecordProcessingError(e)) if (causedByStreamRecordProcessingError(e))
throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());