From c067cefa1f2e61b72d76b5f2b1770ce4b671d23c Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Fri, 21 Jul 2017 08:30:26 -0700 Subject: [PATCH] Make Graceful Shutdown Run On Its Own Thread (#191) * Initial start of fix for requested shutdown * Execute the requested shutdown in a separate thread. Fix for Issue #167 * Reworked some of the shutdown logic to make the relationships clearer. * Added/Updated Copyright Statements * Add Missing License Statements --- build.properties | 10 - .../v2/IShutdownNotificationAware.java | 14 + .../lib/worker/ConsumerStates.java | 14 + .../lib/worker/GracefulShutdownContext.java | 33 + .../worker/GracefulShutdownCoordinator.java | 163 +++++ .../lib/worker/NoOpShardPrioritization.java | 14 + .../ParentsFirstShardPrioritization.java | 14 + .../ShardConsumerShutdownNotification.java | 14 + .../lib/worker/ShardPrioritization.java | 14 + .../lib/worker/ShutdownFuture.java | 155 ---- .../lib/worker/ShutdownNotification.java | 14 + .../lib/worker/ShutdownNotificationTask.java | 14 + .../lib/worker/ThrottlingReporter.java | 14 + .../clientlibrary/lib/worker/Worker.java | 667 ++++++++++-------- .../kinesis/clientlibrary/types/Messages.java | 15 + .../utils/NamedThreadFactory.java | 14 + .../lib/worker/ConsumerStatesTest.java | 14 + .../GracefulShutdownCoordinatorTest.java | 322 +++++++++ ...rentsFirstShardPrioritizationUnitTest.java | 14 + .../lib/worker/ShutdownFutureTest.java | 236 ------- .../lib/worker/ThrottlingReporterTest.java | 14 + .../clientlibrary/lib/worker/WorkerTest.java | 182 ++++- .../proxies/KinesisProxyTest.java | 14 + .../types/ShutdownReasonTest.java | 14 + .../impl/KinesisClientLeaseBuilder.java | 14 + .../services/kinesis/multilang/Matchers.java | 14 + 26 files changed, 1321 insertions(+), 700 deletions(-) delete mode 100644 build.properties create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java delete mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java delete mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java diff --git a/build.properties b/build.properties deleted file mode 100644 index 9a6b868a..00000000 --- a/build.properties +++ /dev/null @@ -1,10 +0,0 @@ -source.. = src/main/java,\ - src/main/resources -output.. = bin/ - -bin.includes = LICENSE.txt,\ - NOTICE.txt,\ - META-INF/,\ - . - -jre.compilation.profile = JavaSE-1.7 diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java index 82a18a0e..b4d4629c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 2d92d7d7..d967b2c3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java new file mode 100644 index 00000000..22a4d92b --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Data; + +import java.util.concurrent.CountDownLatch; + +@Data +class GracefulShutdownContext { + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + private final Worker worker; + + static GracefulShutdownContext SHUTDOWN_ALREADY_COMPLETED = new GracefulShutdownContext(null, null, null); + + boolean isShutdownAlreadyCompleted() { + return shutdownCompleteLatch == null && notificationCompleteLatch == null && worker == null; + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java new file mode 100644 index 00000000..97bef9e3 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java @@ -0,0 +1,163 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +class GracefulShutdownCoordinator { + + Future startGracefulShutdown(Callable shutdownCallable) { + FutureTask task = new FutureTask<>(shutdownCallable); + Thread shutdownThread = new Thread(task, "RequestedShutdownThread"); + shutdownThread.start(); + return task; + + } + + Callable createGracefulShutdownCallable(Callable startWorkerShutdown) { + return new GracefulShutdownCallable(startWorkerShutdown); + } + + static class GracefulShutdownCallable implements Callable { + + private static final Log log = LogFactory.getLog(GracefulShutdownCallable.class); + + private final Callable startWorkerShutdown; + + GracefulShutdownCallable(Callable startWorkerShutdown) { + this.startWorkerShutdown = startWorkerShutdown; + } + + private boolean isWorkerShutdownComplete(GracefulShutdownContext context) { + return context.getWorker().isShutdownComplete() || context.getWorker().getShardInfoShardConsumerMap().isEmpty(); + } + + private String awaitingLogMessage(GracefulShutdownContext context) { + long awaitingNotification = context.getNotificationCompleteLatch().getCount(); + long awaitingFinalShutdown = context.getShutdownCompleteLatch().getCount(); + + return String.format( + "Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ", + awaitingNotification, awaitingFinalShutdown); + } + + private String awaitingFinalShutdownMessage(GracefulShutdownContext context) { + long outstanding = context.getShutdownCompleteLatch().getCount(); + return String.format("Waiting for %d record processors to complete final shutdown", outstanding); + } + + private boolean waitForRecordProcessors(GracefulShutdownContext context) { + + // + // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. + // There is the possibility of a race condition where a lease is terminated after the shutdown request + // notification is started, but before the ShardConsumer is sent the notification. In this case the + // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. + // + try { + while (!context.getNotificationCompleteLatch().await(1, TimeUnit.SECONDS)) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + log.info(awaitingLogMessage(context)); + if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) { + return false; + } + } + } catch (InterruptedException ie) { + log.warn("Interrupted while waiting for notification complete, terminating shutdown. " + + awaitingLogMessage(context)); + return false; + } + + if (Thread.interrupted()) { + log.warn("Interrupted before worker shutdown, terminating shutdown"); + return false; + } + + // + // Once all record processors have been notified of the shutdown it is safe to allow the worker to + // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. + // + context.getWorker().shutdown(); + + if (Thread.interrupted()) { + log.warn("Interrupted after worker shutdown, terminating shutdown"); + return false; + } + + // + // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown + // processing. This should really be a no-op since as part of the notification completion the lease for + // ShardConsumer is terminated. + // + try { + while (!context.getShutdownCompleteLatch().await(1, TimeUnit.SECONDS)) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + log.info(awaitingFinalShutdownMessage(context)); + if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) { + return false; + } + } + } catch (InterruptedException ie) { + log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. " + + awaitingFinalShutdownMessage(context)); + return false; + } + return true; + } + + /** + * This checks to see if the worker has already hit it's shutdown target, while there is outstanding record + * processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the + * latch should be decremented before the shutdown completion. + * + * @param outstanding + * the number of record processor still awaiting shutdown. + */ + private boolean workerShutdownWithRemaining(long outstanding, GracefulShutdownContext context) { + if (isWorkerShutdownComplete(context)) { + if (outstanding != 0) { + log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding + + " with a current value of " + context.getShutdownCompleteLatch().getCount() + ". shutdownComplete: " + + context.getWorker().isShutdownComplete() + " -- Consumer Map: " + + context.getWorker().getShardInfoShardConsumerMap().size()); + return true; + } + } + return false; + } + + @Override + public Boolean call() throws Exception { + GracefulShutdownContext context; + try { + context = startWorkerShutdown.call(); + } catch (Exception ex) { + log.warn("Caught exception while requesting initial worker shutdown.", ex); + throw ex; + } + return context.isShutdownAlreadyCompleted() || waitForRecordProcessors(context); + } + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java index b2f46d13..59a42199 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.List; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java index dbacbd98..8e211eef 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.ArrayList; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java index b3792131..aa5a7942 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.CountDownLatch; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java index 54f7517d..442c37dd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.List; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java deleted file mode 100644 index 8ee96537..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java +++ /dev/null @@ -1,155 +0,0 @@ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete. - */ -class ShutdownFuture implements Future { - - private static final Log log = LogFactory.getLog(ShutdownFuture.class); - - private final CountDownLatch shutdownCompleteLatch; - private final CountDownLatch notificationCompleteLatch; - private final Worker worker; - - ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) { - this.shutdownCompleteLatch = shutdownCompleteLatch; - this.notificationCompleteLatch = notificationCompleteLatch; - this.worker = worker; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException("Cannot cancel a shutdown process"); - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return isWorkerShutdownComplete(); - } - - private boolean isWorkerShutdownComplete() { - return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty(); - } - - private long outstandingRecordProcessors(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - - final long startNanos = System.nanoTime(); - - // - // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. - // There is the possibility of a race condition where a lease is terminated after the shutdown request - // notification is started, but before the ShardConsumer is sent the notification. In this case the - // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. - // - if (!notificationCompleteLatch.await(timeout, unit)) { - long awaitingNotification = notificationCompleteLatch.getCount(); - long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); - log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and " - + awaitingFinalShutdown + " awaiting final shutdown"); - if (awaitingFinalShutdown != 0) { - // - // The number of record processor awaiting final shutdown should be a superset of the those awaiting - // notification - // - return checkWorkerShutdownMiss(awaitingFinalShutdown); - } - } - - long remaining = remainingTimeout(timeout, unit, startNanos); - throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time."); - - // - // Once all record processors have been notified of the shutdown it is safe to allow the worker to - // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. - // - worker.shutdown(); - remaining = remainingTimeout(timeout, unit, startNanos); - throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time."); - - // - // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown - // processing. This should really be a no-op since as part of the notification completion the lease for - // ShardConsumer is terminated. - // - if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) { - long outstanding = shutdownCompleteLatch.getCount(); - log.info("Awaiting " + outstanding + " record processors to complete final shutdown"); - - return checkWorkerShutdownMiss(outstanding); - } - return 0; - } - - private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) { - long checkNanos = System.nanoTime() - startNanos; - return unit.toNanos(timeout) - checkNanos; - } - - private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException { - if (remainingNanos <= 0) { - throw new TimeoutException(message); - } - } - - /** - * This checks to see if the worker has already hit it's shutdown target, while there is outstanding record - * processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the - * latch should be decremented before the shutdown completion. - * - * @param outstanding - * the number of record processor still awaiting shutdown. - * @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already. - */ - private long checkWorkerShutdownMiss(long outstanding) { - if (isWorkerShutdownComplete()) { - if (outstanding != 0) { - log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding - + " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: " - + worker.isShutdownComplete() + " -- Consumer Map: " - + worker.getShardInfoShardConsumerMap().size()); - } - return 0; - } - return outstanding; - } - - @Override - public Void get() throws InterruptedException, ExecutionException { - boolean complete = false; - do { - try { - long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); - complete = outstanding == 0; - log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); - } catch (TimeoutException te) { - log.info("Timeout while waiting for completion: " + te.getMessage()); - } - - } while(!complete); - return null; - } - - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - long outstanding = outstandingRecordProcessors(timeout, unit); - if (outstanding != 0) { - throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown."); - } - return null; - } -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java index 928e6900..8fd492cf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java index a689ee43..11997367 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java index f88f131f..f80bdd29 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import lombok.Getter; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 86cd751e..fd461e31 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -18,15 +18,18 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,14 +55,12 @@ import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * Worker is the high level class that Kinesis applications use to start - * processing data. It initializes and oversees different components (e.g. - * syncing shard and lease information, tracking shard assignments, and - * processing data from the shards). + * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees + * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from + * the shards). */ public class Worker implements Runnable { @@ -77,7 +78,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -96,17 +97,27 @@ public class Worker implements Runnable { // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. - private ConcurrentMap shardInfoShardConsumerMap = - new ConcurrentHashMap(); + private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; - + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; + /** + * Used to ensure that only one requestedShutdown is in progress at a time. + */ + private Future gracefulShutdownFuture; + @VisibleForTesting + protected boolean gracefuleShutdownStarted = false; + @VisibleForTesting + protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); + /** * Constructor. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, @@ -117,120 +128,128 @@ public class Worker implements Runnable { /** * Constructor. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - ExecutorService execService) { - this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration()), + KinesisClientLibConfiguration config, ExecutorService execService) { + this(recordProcessorFactory, config, + new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()), new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), - config.getCloudWatchClientConfiguration()), execService); + config.getCloudWatchClientConfiguration()), + execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param metricsFactory Metrics factory used to emit metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param metricsFactory + * Metrics factory used to emit metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - IMetricsFactory metricsFactory) { + KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) { this(recordProcessorFactory, config, metricsFactory, getExecutorService()); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration()), + KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) { + this(recordProcessorFactory, config, + new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), - config.getDynamoDBClientConfiguration()), metricsFactory, execService); + config.getDynamoDBClientConfiguration()), + metricsFactory, execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient) { this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService()); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, - AmazonCloudWatch cloudWatchClient, - ExecutorService execService) { - this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, - getMetricsFactory(cloudWatchClient, config), execService); + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, + AmazonCloudWatch cloudWatchClient, ExecutorService execService) { + this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, getMetricsFactory(cloudWatchClient, config), + execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this( - config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, + IMetricsFactory metricsFactory, ExecutorService execService) { + this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) - .getProxy(config.getStreamName()), + .getProxy(config.getStreamName()), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getInitialPositionInStreamExtended()), - config.getInitialPositionInStreamExtended(), - config.getParentShardPollIntervalMillis(), - config.getShardSyncIntervalMillis(), - config.shouldCleanupLeasesUponShardCompletion(), - null, + config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), + config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator( new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), config.getWorkerIdentifier(), @@ -270,48 +289,50 @@ public class Worker implements Runnable { + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() + ". Amazon Kinesis endpoint will overwrite region name."); LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); - } else { + } else { LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); } } } /** - * @param applicationName Name of the Kinesis application - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param streamConfig Stream configuration - * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start - * fetching data from this location in the stream when an application starts up for the first time and - * there are no checkpoints. If there are checkpoints, we start from the checkpoint position. - * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done - * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards - * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in - * Kinesis) - * @param checkpoint Used to get/set checkpoints - * @param leaseCoordinator Lease coordinator (coordinates currently owned leases) - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) - * @param metricsFactory Metrics factory used to emit metrics - * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception - * @param shardPrioritization Provides prioritization logic to decide which available shards process first + * @param applicationName + * Name of the Kinesis application + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param streamConfig + * Stream configuration + * @param initialPositionInStream + * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from + * this location in the stream when an application starts up for the first time and there are no + * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param parentShardPollIntervalMillis + * Wait for this long between polls to check if parent shards are done + * @param shardSyncIdleTimeMillis + * Time between tasks to sync leases and Kinesis shards + * @param cleanupLeasesUponShardCompletion + * Clean up shards we've finished processing (don't wait till they expire in Kinesis) + * @param checkpoint + * Used to get/set checkpoints + * @param leaseCoordinator + * Lease coordinator (coordinates currently owned leases) + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) + * @param metricsFactory + * Metrics factory used to emit metrics + * @param taskBackoffTimeMillis + * Backoff period when tasks encounter an exception + * @param shardPrioritization + * Provides prioritization logic to decide which available shards process first */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, - IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, - long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, - boolean cleanupLeasesUponShardCompletion, - ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, - ExecutorService execService, - IMetricsFactory metricsFactory, - long taskBackoffTimeMillis, - long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ShardPrioritization shardPrioritization) { + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.streamConfig = streamConfig; @@ -323,16 +344,11 @@ public class Worker implements Runnable { this.executorService = execService; this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; - this.controlServer = - new ShardSyncTaskManager(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPositionInStream, - cleanupLeasesUponShardCompletion, - shardSyncIdleTimeMillis, - metricsFactory, - executorService); + this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, + executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -345,8 +361,7 @@ public class Worker implements Runnable { } /** - * Start consuming data from the stream, and pass it to the application - * record processors. + * Start consuming data from the stream, and pass it to the application record processors. */ public void run() { if (shutdown) { @@ -419,12 +434,8 @@ public class Worker implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); + ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); @@ -461,14 +472,13 @@ public class Worker implements Runnable { } /** - * NOTE: This method is internal/private to the Worker class. It has package - * access solely for testing. + * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example - * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. - * ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); - * ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); + * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo + * shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); ShardInfo + * shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); */ void cleanupShardConsumers(Set assignedShards) { for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) { @@ -508,8 +518,57 @@ public class Worker implements Runnable { } /** - * Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware}, - * of the impending shutdown. This gives the record processor a final chance to checkpoint. + * Starts the requestedShutdown process, and returns a future that can be used to track the process. + * + * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and + * indicates the process behavior + * + * @return a future that will be set once shutdown is completed. + */ + @Deprecated + public Future requestShutdown() { + + Future requestedShutdownFuture = startGracefulShutdown(); + + return new Future() { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return requestedShutdownFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return requestedShutdownFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return requestedShutdownFuture.isDone(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + requestedShutdownFuture.get(); + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + requestedShutdownFuture.get(timeout, unit); + return null; + } + }; + } + + /** + * Requests a graceful shutdown of the worker, notifying record processors, that implement + * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to + * checkpoint. + * + * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the + * previous future. * * It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is * lost after requesting shutdown, but before the notification is dispatched. @@ -533,48 +592,85 @@ public class Worker implements Runnable { *
  • Once the worker shutdown is complete, the returned future is completed.
  • * * - * - * - * @return a Future that will be set once the shutdown is complete. + * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown + * completed successfully. A false value indicates that a non-exception case caused the shutdown process to + * terminate early. */ - public Future requestShutdown() { - - // - // Stop accepting new leases. Once we do this we can be sure that - // no more leases will be acquired. - // - leaseCoordinator.stopLeaseTaker(); - - Collection leases = leaseCoordinator.getAssignments(); - if (leases == null || leases.isEmpty()) { - // - // If there are no leases notification is already completed, but we still need to shutdown the worker. - // - this.shutdown(); - return Futures.immediateFuture(null); - } - CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); - CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); - for (KinesisClientLease lease : leases) { - ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, - notificationCompleteLatch, shutdownCompleteLatch); - ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - if (consumer != null) { - consumer.notifyShutdownRequested(shutdownNotification); - } else { - // - // There is a race condition between retrieving the current assignments, and creating the - // notification. If the a lease is lost in between these two points, we explicitly decrement the - // notification latches to clear the shutdown. - // - notificationCompleteLatch.countDown(); - shutdownCompleteLatch.countDown(); + public Future startGracefulShutdown() { + synchronized (this) { + if (gracefulShutdownFuture == null) { + gracefulShutdownFuture = gracefulShutdownCoordinator + .startGracefulShutdown(createGracefulShutdownCallable()); } } + return gracefulShutdownFuture; + } - return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this); + /** + * Creates a callable that will execute the graceful shutdown process. This callable can be used to execute graceful + * shutdowns in your own executor, or execute the shutdown synchronously. + * + * @return a callable that run the graceful shutdown process. This may return a callable that return true if the + * graceful shutdown has already been completed. + * @throws IllegalStateException + * thrown by the callable if another callable has already started the shutdown process. + */ + public Callable createGracefulShutdownCallable() { + if (isShutdownComplete()) { + return () -> true; + } + Callable startShutdown = createWorkerShutdownCallable(); + return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown); + } + public boolean hasGracefulShutdownStarted() { + return gracefuleShutdownStarted; + } + + @VisibleForTesting + Callable createWorkerShutdownCallable() { + return () -> { + synchronized (this) { + if (this.gracefuleShutdownStarted) { + throw new IllegalStateException("Requested shutdown has already been started"); + } + this.gracefuleShutdownStarted = true; + } + // + // Stop accepting new leases. Once we do this we can be sure that + // no more leases will be acquired. + // + leaseCoordinator.stopLeaseTaker(); + + Collection leases = leaseCoordinator.getAssignments(); + if (leases == null || leases.isEmpty()) { + // + // If there are no leases notification is already completed, but we still need to shutdown the worker. + // + this.shutdown(); + return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED; + } + CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); + CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); + for (KinesisClientLease lease : leases) { + ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, + lease, notificationCompleteLatch, shutdownCompleteLatch); + ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); + ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + if (consumer != null) { + consumer.notifyShutdownRequested(shutdownNotification); + } else { + // + // There is a race condition between retrieving the current assignments, and creating the + // notification. If the a lease is lost in between these two points, we explicitly decrement the + // notification latches to clear the shutdown. + // + notificationCompleteLatch.countDown(); + shutdownCompleteLatch.countDown(); + } + } + return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this); + }; } boolean isShutdownComplete() { @@ -618,8 +714,8 @@ public class Worker implements Runnable { } /** - * Perform final shutdown related tasks for the worker including shutting down worker owned - * executor services, threads, etc. + * Perform final shutdown related tasks for the worker including shutting down worker owned executor services, + * threads, etc. */ private void finalShutdown() { LOG.info("Starting worker's final shutdown."); @@ -660,11 +756,12 @@ public class Worker implements Runnable { } /** - * NOTE: This method is internal/private to the Worker class. It has package - * access solely for testing. + * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * - * @param shardInfo Kinesis shard info - * @param factory RecordProcessor factory + * @param shardInfo + * Kinesis shard info + * @param factory + * RecordProcessor factory * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { @@ -688,15 +785,15 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); + executorService, metricsFactory, taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist); } /** - * Logger for suppressing too much INFO logging. To avoid too much logging - * information Worker will output logging at INFO level for a single pass - * through the main loop every minute. At DEBUG level it will output all - * INFO logs on every pass. + * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at + * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on + * every pass. */ private static class WorkerLog { @@ -752,90 +849,89 @@ public class Worker implements Runnable { // Backwards compatible constructors /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - AmazonCloudWatchClient cloudWatchClient) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, (AmazonCloudWatch) cloudWatchClient); } /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - AmazonCloudWatchClient cloudWatchClient, - ExecutorService execService) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, - (AmazonCloudWatch) cloudWatchClient, - execService); + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + (AmazonCloudWatch) cloudWatchClient, execService); } /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, - metricsFactory, - execService); + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + metricsFactory, execService); } /** * Given configuration, returns appropriate metrics factory. - * @param cloudWatchClient Amazon CloudWatch client - * @param config KinesisClientLibConfiguration + * + * @param cloudWatchClient + * Amazon CloudWatch client + * @param config + * KinesisClientLibConfiguration * @return Returns metrics factory based on the config. */ - private static IMetricsFactory getMetricsFactory( - AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) { + private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, + KinesisClientLibConfiguration config) { IMetricsFactory metricsFactory; if (config.getMetricsLevel() == MetricsLevel.NONE) { metricsFactory = new NullMetricsFactory(); @@ -845,12 +941,8 @@ public class Worker implements Runnable { cloudWatchClient.setRegion(region); LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); } - metricsFactory = new WorkerCWMetricsFactory( - cloudWatchClient, - config.getApplicationName(), - config.getMetricsBufferTimeMillis(), - config.getMetricsMaxQueueSize(), - config.getMetricsLevel(), + metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), + config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), config.getMetricsEnabledDimensions()); } return metricsFactory; @@ -858,6 +950,7 @@ public class Worker implements Runnable { /** * Returns default executor service that should be used by the worker. + * * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { @@ -866,26 +959,19 @@ public class Worker implements Runnable { } /** - * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance - * or not. + * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Visible and non-final only for testing. */ static class WorkerCWMetricsFactory extends CWMetricsFactory { - WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, - String namespace, - long bufferTimeMillis, - int maxQueueSize, - MetricsLevel metricsLevel, - Set metricsEnabledDimensions) { - super(cloudWatchClient, namespace, bufferTimeMillis, - maxQueueSize, metricsLevel, metricsEnabledDimensions); + WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, + int maxQueueSize, MetricsLevel metricsLevel, Set metricsEnabledDimensions) { + super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions); } } /** - * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance - * or not. + * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance or not. * Visible and non-final only for testing. */ static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { @@ -919,24 +1005,25 @@ public class Worker implements Runnable { } /** - * Provide a V1 - * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}. + * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor + * IRecordProcessor}. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards * @return A reference to this updated object so that method calls can be chained together. */ public Builder recordProcessorFactory( - com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory - recordProcessorFactory) { + com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory) { this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory); return this; } /** - * Provide a V2 - * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}. + * Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor + * IRecordProcessor}. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards * @return A reference to this updated object so that method calls can be chained together. */ public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) { @@ -947,7 +1034,8 @@ public class Worker implements Runnable { /** * Set the Worker config. * - * @param config Kinesis Client Library configuration + * @param config + * Kinesis Client Library configuration * @return A reference to this updated object so that method calls can be chained together. */ public Builder config(KinesisClientLibConfiguration config) { @@ -958,7 +1046,8 @@ public class Worker implements Runnable { /** * Set the Kinesis client. * - * @param kinesisClient Kinesis Client used for fetching data + * @param kinesisClient + * Kinesis Client used for fetching data * @return A reference to this updated object so that method calls can be chained together. */ public Builder kinesisClient(AmazonKinesis kinesisClient) { @@ -969,7 +1058,8 @@ public class Worker implements Runnable { /** * Set the DynamoDB client. * - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases * @return A reference to this updated object so that method calls can be chained together. */ public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { @@ -980,7 +1070,8 @@ public class Worker implements Runnable { /** * Set the Cloudwatch client. * - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param cloudWatchClient + * CloudWatch Client for publishing metrics * @return A reference to this updated object so that method calls can be chained together. */ public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) { @@ -991,7 +1082,8 @@ public class Worker implements Runnable { /** * Set the metrics factory. * - * @param metricsFactory Metrics factory used to emit metrics + * @param metricsFactory + * Metrics factory used to emit metrics * @return A reference to this updated object so that method calls can be chained together. */ public Builder metricsFactory(IMetricsFactory metricsFactory) { @@ -1002,7 +1094,8 @@ public class Worker implements Runnable { /** * Set the executor service for processing records. * - * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) * @return A reference to this updated object so that method calls can be chained together. */ public Builder execService(ExecutorService execService) { @@ -1036,8 +1129,7 @@ public class Worker implements Runnable { "Kinesis Client Library configuration needs to be provided to build Worker"); } if (recordProcessorFactory == null) { - throw new IllegalArgumentException( - "A Record Processor Factory needs to be provided to build Worker"); + throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker"); } if (execService == null) { @@ -1079,7 +1171,7 @@ public class Worker implements Runnable { + ". Amazon Kinesis endpoint will overwrite region name."); LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); - } else { + } else { LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); } } @@ -1090,6 +1182,7 @@ public class Worker implements Runnable { shardPrioritization = new ParentsFirstShardPrioritization(1); } + return new Worker(config.getApplicationName(), recordProcessorFactory, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java index 87545b7e..a467ee57 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + // Generated by the protocol buffer compiler. DO NOT EDIT! // source: messages.proto diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java index 29d8a7be..4be5a092 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.utils; import java.util.concurrent.Executors; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index c0a778e9..31272379 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java new file mode 100644 index 00000000..c032bf0c --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -0,0 +1,322 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.verification.VerificationMode; + +@RunWith(MockitoJUnitRunner.class) +public class GracefulShutdownCoordinatorTest { + + @Mock + private CountDownLatch shutdownCompleteLatch; + @Mock + private CountDownLatch notificationCompleteLatch; + @Mock + private Worker worker; + @Mock + private Callable contextCallable; + @Mock + private ConcurrentMap shardInfoConsumerMap; + + @Test + public void testAllShutdownCompletedAlready() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(worker).shutdown(); + } + + @Test + public void testNotificationNotCompletedYet() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, false, true); + when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L); + mockLatchAwait(shutdownCompleteLatch, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(false, true); + mockShardInfoConsumerMap(1, 0); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + verify(notificationCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class)); + verify(notificationCompleteLatch).getCount(); + + verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(shutdownCompleteLatch, times(2)).getCount(); + + verify(worker).shutdown(); + } + + @Test + public void testShutdownNotCompletedYet() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, true); + mockLatchAwait(shutdownCompleteLatch, false, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(false, true); + mockShardInfoConsumerMap(1, 0); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(notificationCompleteLatch, never()).getCount(); + + verify(shutdownCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class)); + verify(shutdownCompleteLatch, times(2)).getCount(); + + verify(worker).shutdown(); + } + + @Test + public void testMultipleAttemptsForNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, false, false, true); + when(notificationCompleteLatch.getCount()).thenReturn(2L, 1L, 0L); + + mockLatchAwait(shutdownCompleteLatch, true); + when(shutdownCompleteLatch.getCount()).thenReturn(2L, 2L, 1L, 1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(false, false, false, true); + mockShardInfoConsumerMap(2, 1, 0); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + + verifyLatchAwait(notificationCompleteLatch, 3); + verify(notificationCompleteLatch, times(2)).getCount(); + + verifyLatchAwait(shutdownCompleteLatch, 1); + verify(shutdownCompleteLatch, times(4)).getCount(); + } + + @Test + public void testWorkerAlreadyShutdownAtNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, false, true); + when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L); + + mockLatchAwait(shutdownCompleteLatch, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(true); + mockShardInfoConsumerMap(0); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch).getCount(); + + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(shutdownCompleteLatch, times(3)).getCount(); + } + + @Test + public void testWorkerAlreadyShutdownAtComplete() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, true); + + mockLatchAwait(shutdownCompleteLatch, false, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 1L); + + when(worker.isShutdownComplete()).thenReturn(true); + mockShardInfoConsumerMap(0); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch, never()).getCount(); + + verifyLatchAwait(shutdownCompleteLatch); + verify(shutdownCompleteLatch, times(3)).getCount(); + } + + @Test + public void testNotificationInterrupted() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + when(notificationCompleteLatch.getCount()).thenReturn(1L); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(worker, never()).shutdown(); + } + + @Test + public void testShutdownInterrupted() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch); + verify(worker).shutdown(); + } + + @Test + public void testInterruptedAfterNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + return true; + }); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(worker, never()).shutdown(); + } + + @Test + public void testInterruptedAfterWorkerShutdown() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + doAnswer(invocation -> { + Thread.currentThread().interrupt(); + return true; + }).when(worker).shutdown(); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(worker).shutdown(); + } + + @Test + public void testInterruptedDuringNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + return false; + }); + when(notificationCompleteLatch.getCount()).thenReturn(1L); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch).getCount(); + + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(shutdownCompleteLatch).getCount(); + + verify(worker, never()).shutdown(); + } + + @Test + public void testInterruptedDuringShutdown() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + return false; + }); + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch, never()).getCount(); + + verifyLatchAwait(shutdownCompleteLatch); + verify(shutdownCompleteLatch).getCount(); + + verify(worker).shutdown(); + } + + @Test(expected = IllegalStateException.class) + public void testWorkerShutdownCallableThrows() throws Exception { + Callable requestedShutdownCallable = new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable); + when(contextCallable.call()).thenThrow(new IllegalStateException("Bad Shutdown")); + + requestedShutdownCallable.call(); + } + + private void verifyLatchAwait(CountDownLatch latch) throws Exception { + verifyLatchAwait(latch, times(1)); + } + + private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { + verifyLatchAwait(latch, times(times)); + } + + private void verifyLatchAwait(CountDownLatch latch, VerificationMode verificationMode) throws Exception { + verify(latch, verificationMode).await(anyLong(), any(TimeUnit.class)); + } + + private void mockLatchAwait(CountDownLatch latch, Boolean initial, Boolean... remaining) throws Exception { + when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, remaining); + } + + private Callable buildRequestedShutdownCallable() throws Exception { + GracefulShutdownContext context = new GracefulShutdownContext(shutdownCompleteLatch, + notificationCompleteLatch, worker); + when(contextCallable.call()).thenReturn(context); + return new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable); + } + + private void mockShardInfoConsumerMap(Integer initialItemCount, Integer... additionalItemCounts) { + when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); + Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length]; + for (int i = 0; i < additionalItemCounts.length; ++i) { + additionalEmptyStates[i] = additionalItemCounts[i] == 0; + } + when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts); + when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates); + } + +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java index 7ba0753d..42fd82de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.junit.Assert.assertEquals; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java deleted file mode 100644 index cccbc9a1..00000000 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java +++ /dev/null @@ -1,236 +0,0 @@ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; -import org.mockito.stubbing.OngoingStubbing; - -@RunWith(MockitoJUnitRunner.class) -public class ShutdownFutureTest { - - @Mock - private CountDownLatch shutdownCompleteLatch; - @Mock - private CountDownLatch notificationCompleteLatch; - @Mock - private Worker worker; - @Mock - private ConcurrentMap shardInfoConsumerMap; - - @Test - public void testSimpleGetAlreadyCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - - mockNotificationComplete(true); - mockShutdownComplete(true); - - future.get(); - - verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); - verify(worker).shutdown(); - verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); - } - - @Test - public void testNotificationNotCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - - mockNotificationComplete(false, true); - mockShutdownComplete(true); - - when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); - when(shardInfoConsumerMap.isEmpty()).thenReturn(false); - when(worker.isShutdownComplete()).thenReturn(false); - - when(notificationCompleteLatch.getCount()).thenReturn(1L); - when(shutdownCompleteLatch.getCount()).thenReturn(1L); - - expectedTimeoutException(future); - - verify(worker, never()).shutdown(); - - awaitFuture(future); - - verify(notificationCompleteLatch).getCount(); - verifyLatchAwait(notificationCompleteLatch, 2); - - verify(shutdownCompleteLatch).getCount(); - verifyLatchAwait(shutdownCompleteLatch); - - verify(worker).shutdown(); - - } - - @Test - public void testShutdownNotCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - mockNotificationComplete(true); - mockShutdownComplete(false, true); - - when(shutdownCompleteLatch.getCount()).thenReturn(1L); - when(worker.isShutdownComplete()).thenReturn(false); - - mockShardInfoConsumerMap(1); - - expectedTimeoutException(future); - verify(worker).shutdown(); - awaitFuture(future); - - verifyLatchAwait(notificationCompleteLatch, 2); - verifyLatchAwait(shutdownCompleteLatch, 2); - - verify(worker).isShutdownComplete(); - verify(worker).getShardInfoShardConsumerMap(); - - } - - @Test - public void testShutdownNotCompleteButWorkerShutdown() throws Exception { - ShutdownFuture future = create(); - - mockNotificationComplete(true); - mockShutdownComplete(false); - - when(shutdownCompleteLatch.getCount()).thenReturn(1L); - when(worker.isShutdownComplete()).thenReturn(true); - mockShardInfoConsumerMap(1); - - awaitFuture(future); - verify(worker).shutdown(); - verifyLatchAwait(notificationCompleteLatch); - verifyLatchAwait(shutdownCompleteLatch); - - verify(worker, times(2)).isShutdownComplete(); - verify(worker).getShardInfoShardConsumerMap(); - verify(shardInfoConsumerMap).size(); - } - - @Test - public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception { - ShutdownFuture future = create(); - mockNotificationComplete(true); - mockShutdownComplete(false); - - mockOutstanding(shutdownCompleteLatch, 1L); - - when(worker.isShutdownComplete()).thenReturn(false); - mockShardInfoConsumerMap(0); - - awaitFuture(future); - verify(worker).shutdown(); - verifyLatchAwait(notificationCompleteLatch); - verifyLatchAwait(shutdownCompleteLatch); - - verify(worker, times(2)).isShutdownComplete(); - verify(worker, times(2)).getShardInfoShardConsumerMap(); - - verify(shardInfoConsumerMap).isEmpty(); - verify(shardInfoConsumerMap).size(); - } - - @Test - public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception { - ShutdownFuture future = create(); - mockNotificationComplete(false); - mockShutdownComplete(false); - - mockOutstanding(notificationCompleteLatch, 1L); - mockOutstanding(shutdownCompleteLatch, 1L); - - when(worker.isShutdownComplete()).thenReturn(false); - mockShardInfoConsumerMap(0); - - awaitFuture(future); - verify(worker, never()).shutdown(); - verifyLatchAwait(notificationCompleteLatch); - verify(shutdownCompleteLatch, never()).await(); - - verify(worker, times(2)).isShutdownComplete(); - verify(worker, times(2)).getShardInfoShardConsumerMap(); - - verify(shardInfoConsumerMap).isEmpty(); - verify(shardInfoConsumerMap).size(); - } - - @Test(expected = TimeoutException.class) - public void testTimeExceededException() throws Exception { - ShutdownFuture future = create(); - mockNotificationComplete(false); - mockOutstanding(notificationCompleteLatch, 1L); - when(worker.isShutdownComplete()).thenReturn(false); - mockShardInfoConsumerMap(1); - - future.get(1, TimeUnit.NANOSECONDS); - } - - private ShutdownFuture create() { - return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - } - - private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) { - when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); - Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length]; - for(int i = 0; i < additionalItemCounts.length; ++i) { - additionalEmptyStates[i] = additionalItemCounts[i] == 0; - } - when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts); - when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates); - } - - private void verifyLatchAwait(CountDownLatch latch) throws Exception { - verifyLatchAwait(latch, 1); - } - - private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { - verify(latch, times(times)).await(anyLong(), any(TimeUnit.class)); - } - - private void expectedTimeoutException(ShutdownFuture future) throws Exception { - boolean gotTimeout = false; - try { - awaitFuture(future); - } catch (TimeoutException te) { - gotTimeout = true; - } - assertThat("Expected a timeout exception to occur", gotTimeout); - } - - private void awaitFuture(ShutdownFuture future) throws Exception { - future.get(1, TimeUnit.SECONDS); - } - - private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception { - mockLatch(notificationCompleteLatch, initial, states); - - } - - private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception { - mockLatch(shutdownCompleteLatch, initial, states); - } - - private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception { - when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states); - } - - private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception { - when(latch.getCount()).thenReturn(remaining, additionalRemaining); - } - -} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java index d0645229..79118ac9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.mockito.Matchers.any; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index daf58165..5913bf0d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -762,7 +762,7 @@ public class WorkerTest { verify(executorService, atLeastOnce()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); worker.runProcessLoop(); verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) @@ -781,6 +781,146 @@ public class WorkerTest { } + @Test(expected = IllegalStateException.class) + public void testShutdownCallableNotAllowedTwice() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + @Override + void postConstruct() { + this.gracefuleShutdownStarted = true; + } + }; + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + assertThat(worker.hasGracefulShutdownStarted(), equalTo(true)); + worker.createWorkerShutdownCallable().call(); + + } + + @Test + public void testGracefulShutdownSingleFuture() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + GracefulShutdownCoordinator coordinator = mock(GracefulShutdownCoordinator.class); + when(coordinator.createGracefulShutdownCallable(any(Callable.class))).thenReturn(() -> true); + + Future gracefulShutdownFuture = mock(Future.class); + + when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); + + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + @Override + void postConstruct() { + this.gracefulShutdownCoordinator = coordinator; + } + }; + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + Future firstFuture = worker.startGracefulShutdown(); + Future secondFuture = worker.startGracefulShutdown(); + + assertThat(firstFuture, equalTo(secondFuture)); + verify(coordinator).startGracefulShutdown(any(Callable.class)); + + } + @Test public void testRequestShutdownNoLeases() throws Exception { @@ -830,7 +970,7 @@ public class WorkerTest { verify(executorService, never()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); worker.runProcessLoop(); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) @@ -909,7 +1049,7 @@ public class WorkerTest { .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2))))); worker.getShardInfoShardConsumerMap().remove(shardInfo2); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); leases.remove(1); currentAssignments.remove(1); worker.runProcessLoop(); @@ -1194,6 +1334,24 @@ public class WorkerTest { } + private abstract class InjectableWorker extends Worker { + InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, + boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { + super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, + parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, + checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, + failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); + postConstruct(); + } + + abstract void postConstruct(); + } + private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) { return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID()) .withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java index db0e3d0c..2b7aa0b7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.proxies; import static org.hamcrest.Matchers.both; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java index 011e0721..0b9a72f1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.types; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java index df39b9f2..90a1676d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.leases.impl; import java.util.HashSet; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java index b84d61a0..92ac15f7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.multilang; import static org.hamcrest.CoreMatchers.equalTo;