Convert startGracefulShutdown() to a CompletableFuture
This commit is contained in:
parent
bd96580b0a
commit
6516c36789
2 changed files with 12 additions and 21 deletions
|
|
@ -14,21 +14,20 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.coordinator;
|
package software.amazon.kinesis.coordinator;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.Future;
|
import java.util.function.Supplier;
|
||||||
import java.util.concurrent.FutureTask;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
class GracefulShutdownCoordinator {
|
class GracefulShutdownCoordinator {
|
||||||
|
|
||||||
Future<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) {
|
CompletableFuture<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) {
|
||||||
FutureTask<Boolean> task = new FutureTask<>(shutdownCallable);
|
CompletableFuture<Boolean> cf = new CompletableFuture<>();
|
||||||
Thread shutdownThread = new Thread(task, "RequestedShutdownThread");
|
CompletableFuture.runAsync(() -> {
|
||||||
shutdownThread.start();
|
try { cf.complete(shutdownCallable.call()); }
|
||||||
return task;
|
catch(Throwable ex) { cf.completeExceptionally(ex); }
|
||||||
|
});
|
||||||
|
return cf;
|
||||||
}
|
}
|
||||||
|
|
||||||
Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
|
Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
|
||||||
|
|
|
||||||
|
|
@ -31,15 +31,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
@ -187,7 +179,7 @@ public class Scheduler implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Used to ensure that only one requestedShutdown is in progress at a time.
|
* Used to ensure that only one requestedShutdown is in progress at a time.
|
||||||
*/
|
*/
|
||||||
private Future<Boolean> gracefulShutdownFuture;
|
private CompletableFuture<Boolean> gracefulShutdownFuture;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected boolean gracefuleShutdownStarted = false;
|
protected boolean gracefuleShutdownStarted = false;
|
||||||
|
|
||||||
|
|
@ -716,7 +708,7 @@ public class Scheduler implements Runnable {
|
||||||
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
|
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
|
||||||
* terminate early.
|
* terminate early.
|
||||||
*/
|
*/
|
||||||
public Future<Boolean> startGracefulShutdown() {
|
public CompletableFuture<Boolean> startGracefulShutdown() {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (gracefulShutdownFuture == null) {
|
if (gracefulShutdownFuture == null) {
|
||||||
gracefulShutdownFuture = gracefulShutdownCoordinator
|
gracefulShutdownFuture = gracefulShutdownCoordinator
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue