From b8861e50b116f95c0eac6d9f2aef5de3943490f5 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Tue, 25 Jun 2019 14:28:02 -0700 Subject: [PATCH] feature(worker): throw and handle AmazonDynamDBException --- .../kinesis/clientlibrary/lib/worker/ShardConsumer.java | 4 +++- .../services/kinesis/clientlibrary/lib/worker/Worker.java | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index ae1153b1..f724c801 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -319,13 +320,14 @@ class ShardConsumer { return TaskOutcome.SUCCESSFUL; } logTaskException(result); + throw result.getException(); } catch (Exception e) { + if (e instanceof AmazonDynamoDBException) throw (AmazonDynamoDBException) e; throw new RuntimeException(e); } finally { // Setting future to null so we don't misinterpret task completion status in case of exceptions future = null; } - return TaskOutcome.FAILURE; } private void logTaskException(TaskResult taskResult) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 9df8ff73..a4aafb86 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -520,6 +521,8 @@ public class Worker implements Runnable { wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); + } catch (AmazonDynamoDBException e) { + throw e; } catch (Exception e) { if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage());