diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java index e67414c6..29bd699e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java @@ -14,21 +14,19 @@ */ package software.amazon.kinesis.coordinator; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import lombok.extern.slf4j.Slf4j; class GracefulShutdownCoordinator { - Future startGracefulShutdown(Callable shutdownCallable) { - FutureTask task = new FutureTask<>(shutdownCallable); - Thread shutdownThread = new Thread(task, "RequestedShutdownThread"); - shutdownThread.start(); - return task; - + CompletableFuture startGracefulShutdown(Callable shutdownCallable) { + CompletableFuture cf = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { cf.complete(shutdownCallable.call()); } + catch(Throwable ex) { cf.completeExceptionally(ex); } + }); + return cf; } Callable createGracefulShutdownCallable(Callable startWorkerShutdown) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index aa0a5568..e9431c15 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -31,15 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; @@ -187,7 +179,7 @@ public class Scheduler implements Runnable { /** * Used to ensure that only one requestedShutdown is in progress at a time. */ - private Future gracefulShutdownFuture; + private CompletableFuture gracefulShutdownFuture; @VisibleForTesting protected boolean gracefuleShutdownStarted = false; @@ -716,7 +708,7 @@ public class Scheduler implements Runnable { * completed successfully. A false value indicates that a non-exception case caused the shutdown process to * terminate early. */ - public Future startGracefulShutdown() { + public CompletableFuture startGracefulShutdown() { synchronized (this) { if (gracefulShutdownFuture == null) { gracefulShutdownFuture = gracefulShutdownCoordinator