Merge pull request #815 from etspaceman/completableFuture

Convert startGracefulShutdown() to a CompletableFuture
This commit is contained in:
Avinash Chowdary 2021-06-01 14:57:52 -07:00 committed by GitHub
commit 968b47a779
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 21 deletions

View file

@ -14,21 +14,19 @@
*/ */
package software.amazon.kinesis.coordinator; package software.amazon.kinesis.coordinator;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
class GracefulShutdownCoordinator { class GracefulShutdownCoordinator {
Future<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) { CompletableFuture<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) {
FutureTask<Boolean> task = new FutureTask<>(shutdownCallable); CompletableFuture<Boolean> cf = new CompletableFuture<>();
Thread shutdownThread = new Thread(task, "RequestedShutdownThread"); CompletableFuture.runAsync(() -> {
shutdownThread.start(); try { cf.complete(shutdownCallable.call()); }
return task; catch(Throwable ex) { cf.completeExceptionally(ex); }
});
return cf;
} }
Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) { Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {

View file

@ -31,15 +31,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -187,7 +179,7 @@ public class Scheduler implements Runnable {
/** /**
* Used to ensure that only one requestedShutdown is in progress at a time. * Used to ensure that only one requestedShutdown is in progress at a time.
*/ */
private Future<Boolean> gracefulShutdownFuture; private CompletableFuture<Boolean> gracefulShutdownFuture;
@VisibleForTesting @VisibleForTesting
protected boolean gracefuleShutdownStarted = false; protected boolean gracefuleShutdownStarted = false;
@ -716,7 +708,7 @@ public class Scheduler implements Runnable {
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to * completed successfully. A false value indicates that a non-exception case caused the shutdown process to
* terminate early. * terminate early.
*/ */
public Future<Boolean> startGracefulShutdown() { public CompletableFuture<Boolean> startGracefulShutdown() {
synchronized (this) { synchronized (this) {
if (gracefulShutdownFuture == null) { if (gracefulShutdownFuture == null) {
gracefulShutdownFuture = gracefulShutdownCoordinator gracefulShutdownFuture = gracefulShutdownCoordinator