From 6516c36789fedd2635e1aafd4cc33cb4afe7f9f7 Mon Sep 17 00:00:00 2001 From: Eric Meisel Date: Tue, 11 May 2021 16:50:51 -0500 Subject: [PATCH 1/2] Convert startGracefulShutdown() to a CompletableFuture --- .../GracefulShutdownCoordinator.java | 19 +++++++++---------- .../amazon/kinesis/coordinator/Scheduler.java | 14 +++----------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java index e67414c6..981fda52 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java @@ -14,21 +14,20 @@ */ package software.amazon.kinesis.coordinator; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; class GracefulShutdownCoordinator { - Future startGracefulShutdown(Callable shutdownCallable) { - FutureTask task = new FutureTask<>(shutdownCallable); - Thread shutdownThread = new Thread(task, "RequestedShutdownThread"); - shutdownThread.start(); - return task; - + CompletableFuture startGracefulShutdown(Callable shutdownCallable) { + CompletableFuture cf = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { cf.complete(shutdownCallable.call()); } + catch(Throwable ex) { cf.completeExceptionally(ex); } + }); + return cf; } Callable createGracefulShutdownCallable(Callable startWorkerShutdown) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index aa0a5568..e9431c15 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -31,15 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; @@ -187,7 +179,7 @@ public class Scheduler implements Runnable { /** * Used to ensure that only one requestedShutdown is in progress at a time. */ - private Future gracefulShutdownFuture; + private CompletableFuture gracefulShutdownFuture; @VisibleForTesting protected boolean gracefuleShutdownStarted = false; @@ -716,7 +708,7 @@ public class Scheduler implements Runnable { * completed successfully. A false value indicates that a non-exception case caused the shutdown process to * terminate early. */ - public Future startGracefulShutdown() { + public CompletableFuture startGracefulShutdown() { synchronized (this) { if (gracefulShutdownFuture == null) { gracefulShutdownFuture = gracefulShutdownCoordinator From c5fcfc6ee1e70cb074cfb7b06ecba2ac1e819650 Mon Sep 17 00:00:00 2001 From: Eric Meisel Date: Tue, 11 May 2021 16:57:02 -0500 Subject: [PATCH 2/2] Remove added import --- .../amazon/kinesis/coordinator/GracefulShutdownCoordinator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java index 981fda52..29bd699e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java @@ -15,7 +15,6 @@ package software.amazon.kinesis.coordinator; import java.util.concurrent.*; -import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j;