From 06652d8f889a249e70bdeb74e84291b4e6e2ea4d Mon Sep 17 00:00:00 2001 From: bencvdb Date: Tue, 25 Jun 2019 11:03:14 -0700 Subject: [PATCH 1/7] feature(worker): throw insufficent permissions error --- .../kinesis/clientlibrary/lib/worker/InitializeTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 5e847a89..11da75e3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -110,6 +110,7 @@ class InitializeTask implements ITask { LOG.error("Application initialize() threw exception: ", e); } else { LOG.error("Caught exception: ", e); + throw new RuntimeException(e); } exception = e; // backoff if we encounter an exception. From b8861e50b116f95c0eac6d9f2aef5de3943490f5 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Tue, 25 Jun 2019 14:28:02 -0700 Subject: [PATCH 2/7] feature(worker): throw and handle AmazonDynamDBException --- .../kinesis/clientlibrary/lib/worker/ShardConsumer.java | 4 +++- .../services/kinesis/clientlibrary/lib/worker/Worker.java | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index ae1153b1..f724c801 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -319,13 +320,14 @@ class ShardConsumer { return TaskOutcome.SUCCESSFUL; } logTaskException(result); + throw result.getException(); } catch (Exception e) { + if (e instanceof AmazonDynamoDBException) throw (AmazonDynamoDBException) e; throw new RuntimeException(e); } finally { // Setting future to null so we don't misinterpret task completion status in case of exceptions future = null; } - return TaskOutcome.FAILURE; } private void logTaskException(TaskResult taskResult) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 9df8ff73..a4aafb86 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -520,6 +521,8 @@ public class Worker implements Runnable { wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); + } catch (AmazonDynamoDBException e) { + throw e; } catch (Exception e) { if (causedByStreamRecordProcessingError(e)) throw new RuntimeException("Failing worker after irrecoverable failure: " + e.getMessage()); From 08bf7f7000268e382bcdcd0d8ae56ad6ae90de49 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Tue, 25 Jun 2019 15:01:54 -0700 Subject: [PATCH 3/7] feature(worker): remove RuntimeException wrapping --- .../kinesis/clientlibrary/lib/worker/InitializeTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 11da75e3..5e847a89 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -110,7 +110,6 @@ class InitializeTask implements ITask { LOG.error("Application initialize() threw exception: ", e); } else { LOG.error("Caught exception: ", e); - throw new RuntimeException(e); } exception = e; // backoff if we encounter an exception. From 9e102af3ddc808de2dc8890f6ba95c372ce82fe1 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Wed, 26 Jun 2019 09:35:59 -0700 Subject: [PATCH 4/7] feature(worker): make Worker#getLeaseCoordinator public --- .../services/kinesis/clientlibrary/lib/worker/Worker.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index a4aafb86..0dbc1e27 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -468,7 +468,7 @@ public class Worker implements Runnable { /** * @return the leaseCoordinator */ - KinesisClientLibLeaseCoordinator getLeaseCoordinator() { + public KinesisClientLibLeaseCoordinator getLeaseCoordinator() { return leaseCoordinator; } @@ -1030,6 +1030,10 @@ public class Worker implements Runnable { return new WorkerThreadPoolExecutor(threadFactory); } + public ExecutorService returnExecutorService() { + return this.executorService; + } + private static void setField(final S source, final String field, final Consumer t, T value) { try { t.accept(value); From d2f95cb5edca8b3b6ad69117a801f59e05699e50 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Wed, 26 Jun 2019 13:51:50 -0700 Subject: [PATCH 5/7] feature(worker): catch errors from runProcessLoop and shutdown --- .../kinesis/clientlibrary/lib/worker/Worker.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 0dbc1e27..60bc15cd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -489,12 +489,15 @@ public class Worker implements Runnable { shutdown(); } - while (!shouldShutdown()) { - runProcessLoop(); + try { + while (!shouldShutdown()) { + runProcessLoop(); + } + } + finally { + finalShutdown(); + LOG.info("Worker loop is complete. Exiting from worker."); } - - finalShutdown(); - LOG.info("Worker loop is complete. Exiting from worker."); } @VisibleForTesting From ce875f3d48b5562ef22c70d4474325e391afe535 Mon Sep 17 00:00:00 2001 From: bencvdb Date: Thu, 27 Jun 2019 09:07:17 -0700 Subject: [PATCH 6/7] feature(worker): remove unnecessary functions and permissions --- .../services/kinesis/clientlibrary/lib/worker/Worker.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 60bc15cd..b4d12e79 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -468,7 +468,7 @@ public class Worker implements Runnable { /** * @return the leaseCoordinator */ - public KinesisClientLibLeaseCoordinator getLeaseCoordinator() { + KinesisClientLibLeaseCoordinator getLeaseCoordinator() { return leaseCoordinator; } @@ -1033,10 +1033,6 @@ public class Worker implements Runnable { return new WorkerThreadPoolExecutor(threadFactory); } - public ExecutorService returnExecutorService() { - return this.executorService; - } - private static void setField(final S source, final String field, final Consumer t, T value) { try { t.accept(value); From a807444f57cabe0590efd32d98fe16db27ac69da Mon Sep 17 00:00:00 2001 From: bencvdb Date: Mon, 1 Jul 2019 14:46:35 -0700 Subject: [PATCH 7/7] fix(Worker): align finally --- .../services/kinesis/clientlibrary/lib/worker/Worker.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index b4d12e79..7ce6f104 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -493,8 +493,7 @@ public class Worker implements Runnable { while (!shouldShutdown()) { runProcessLoop(); } - } - finally { + } finally { finalShutdown(); LOG.info("Worker loop is complete. Exiting from worker."); }