diff --git a/.travis.yml b/.travis.yml index ebb7a2ac..320f811c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: java jdk: - - openjdk7 - - oraclejdk7 + - openjdk8 - oraclejdk8 -sudo: false \ No newline at end of file +sudo: false +dist: trusty \ No newline at end of file diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 986ff77d..826c1ddf 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.7.6 +Bundle-Version: 1.8.1 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/README.md b/README.md index 098adc8e..e0320dbe 100644 --- a/README.md +++ b/README.md @@ -23,12 +23,31 @@ The **Amazon Kinesis Client Library for Java** (Amazon KCL) enables Java develop After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use this command: `mvn clean install -Dgpg.skip=true` ## Integration with the Kinesis Producer Library -For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort.  When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. +For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. ## Amazon KCL support for other languages To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.8.1 (August 2, 2017) +* Support timeouts for calls to the MultiLang Daemon + This adds support for setting a timeout when dispatching records to the client record processor. If the record processor doesn't respond within the timeout the parent Java process will be terminated. This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor. + The timeout for the this can be set by adding `timeoutInSeconds = `. The default for this is no timeout. + __Setting this can cause the KCL to exit suddenly, before using this ensure that you have an automated restart for your application__ + * [PR #195](https://github.com/awslabs/amazon-kinesis-client/pull/195) + * [Issue #185](https://github.com/awslabs/amazon-kinesis-client/issues/185) + +### Release 1.8.0 (July 25, 2017) +* Execute graceful shutdown on its own thread + * [PR #191](https://github.com/awslabs/amazon-kinesis-client/pull/191) + * [Issue #167](https://github.com/awslabs/amazon-kinesis-client/issues/167) +* Added support for controlling the size of the lease renewer thread pool + * [PR #177](https://github.com/awslabs/amazon-kinesis-client/pull/177) + * [Issue #171](https://github.com/awslabs/amazon-kinesis-client/issues/171) +* Require Java 8 and later + __Java 8 is now required for versions 1.8.0 of the amazon-kinesis-client and later.__ + * [PR #176](https://github.com/awslabs/amazon-kinesis-client/issues/176) + ### Release 1.7.6 (June 21, 2017) * Added support for graceful shutdown in MultiLang Clients * [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174) diff --git a/build.properties b/build.properties deleted file mode 100644 index 9a6b868a..00000000 --- a/build.properties +++ /dev/null @@ -1,10 +0,0 @@ -source.. = src/main/java,\ - src/main/resources -output.. = bin/ - -bin.includes = LICENSE.txt,\ - NOTICE.txt,\ - META-INF/,\ - . - -jre.compilation.profile = JavaSE-1.7 diff --git a/pom.xml b/pom.xml index b4a1d10e..a2bc6511 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.7.6 + 1.8.1 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. @@ -25,7 +25,7 @@ - 1.11.151 + 1.11.171 1.0.392 libsqlite4java ${project.build.directory}/test-lib @@ -131,8 +131,8 @@ maven-compiler-plugin 3.2 - 1.7 - 1.7 + 1.8 + 1.8 UTF-8 diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java index 82a18a0e..b4d4629c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 2d92d7d7..d967b2c3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java new file mode 100644 index 00000000..22a4d92b --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Data; + +import java.util.concurrent.CountDownLatch; + +@Data +class GracefulShutdownContext { + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + private final Worker worker; + + static GracefulShutdownContext SHUTDOWN_ALREADY_COMPLETED = new GracefulShutdownContext(null, null, null); + + boolean isShutdownAlreadyCompleted() { + return shutdownCompleteLatch == null && notificationCompleteLatch == null && worker == null; + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java new file mode 100644 index 00000000..97bef9e3 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java @@ -0,0 +1,163 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +class GracefulShutdownCoordinator { + + Future startGracefulShutdown(Callable shutdownCallable) { + FutureTask task = new FutureTask<>(shutdownCallable); + Thread shutdownThread = new Thread(task, "RequestedShutdownThread"); + shutdownThread.start(); + return task; + + } + + Callable createGracefulShutdownCallable(Callable startWorkerShutdown) { + return new GracefulShutdownCallable(startWorkerShutdown); + } + + static class GracefulShutdownCallable implements Callable { + + private static final Log log = LogFactory.getLog(GracefulShutdownCallable.class); + + private final Callable startWorkerShutdown; + + GracefulShutdownCallable(Callable startWorkerShutdown) { + this.startWorkerShutdown = startWorkerShutdown; + } + + private boolean isWorkerShutdownComplete(GracefulShutdownContext context) { + return context.getWorker().isShutdownComplete() || context.getWorker().getShardInfoShardConsumerMap().isEmpty(); + } + + private String awaitingLogMessage(GracefulShutdownContext context) { + long awaitingNotification = context.getNotificationCompleteLatch().getCount(); + long awaitingFinalShutdown = context.getShutdownCompleteLatch().getCount(); + + return String.format( + "Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ", + awaitingNotification, awaitingFinalShutdown); + } + + private String awaitingFinalShutdownMessage(GracefulShutdownContext context) { + long outstanding = context.getShutdownCompleteLatch().getCount(); + return String.format("Waiting for %d record processors to complete final shutdown", outstanding); + } + + private boolean waitForRecordProcessors(GracefulShutdownContext context) { + + // + // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. + // There is the possibility of a race condition where a lease is terminated after the shutdown request + // notification is started, but before the ShardConsumer is sent the notification. In this case the + // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. + // + try { + while (!context.getNotificationCompleteLatch().await(1, TimeUnit.SECONDS)) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + log.info(awaitingLogMessage(context)); + if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) { + return false; + } + } + } catch (InterruptedException ie) { + log.warn("Interrupted while waiting for notification complete, terminating shutdown. " + + awaitingLogMessage(context)); + return false; + } + + if (Thread.interrupted()) { + log.warn("Interrupted before worker shutdown, terminating shutdown"); + return false; + } + + // + // Once all record processors have been notified of the shutdown it is safe to allow the worker to + // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. + // + context.getWorker().shutdown(); + + if (Thread.interrupted()) { + log.warn("Interrupted after worker shutdown, terminating shutdown"); + return false; + } + + // + // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown + // processing. This should really be a no-op since as part of the notification completion the lease for + // ShardConsumer is terminated. + // + try { + while (!context.getShutdownCompleteLatch().await(1, TimeUnit.SECONDS)) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + log.info(awaitingFinalShutdownMessage(context)); + if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) { + return false; + } + } + } catch (InterruptedException ie) { + log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. " + + awaitingFinalShutdownMessage(context)); + return false; + } + return true; + } + + /** + * This checks to see if the worker has already hit it's shutdown target, while there is outstanding record + * processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the + * latch should be decremented before the shutdown completion. + * + * @param outstanding + * the number of record processor still awaiting shutdown. + */ + private boolean workerShutdownWithRemaining(long outstanding, GracefulShutdownContext context) { + if (isWorkerShutdownComplete(context)) { + if (outstanding != 0) { + log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding + + " with a current value of " + context.getShutdownCompleteLatch().getCount() + ". shutdownComplete: " + + context.getWorker().isShutdownComplete() + " -- Consumer Map: " + + context.getWorker().getShardInfoShardConsumerMap().size()); + return true; + } + } + return false; + } + + @Override + public Boolean call() throws Exception { + GracefulShutdownContext context; + try { + context = startWorkerShutdown.call(); + } catch (Exception ex) { + log.warn("Caught exception while requesting initial worker shutdown.", ex); + throw ex; + } + return context.isShutdownAlreadyCompleted() || waitForRecordProcessors(context); + } + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index b708a30a..24112ed8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -15,8 +15,11 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Date; +import java.util.Optional; import java.util.Set; +import org.apache.commons.lang.Validate; + import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; @@ -25,6 +28,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; +import lombok.Getter; + /** * Configuration for the Amazon Kinesis Client Library. */ @@ -121,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.6"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.1"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls @@ -155,10 +160,10 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; - /* - * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. - * This assumes that the shards and leases are in-sync. - * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + /** + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This + * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. + * during incremental deployments of an application). */ public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; @@ -167,6 +172,11 @@ public class KinesisClientLibConfiguration { */ public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); + /** + * The size of the thread pool to create for the lease renewer to use. + */ + public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; + private String applicationName; private String tableName; private String streamName; @@ -204,6 +214,12 @@ public class KinesisClientLibConfiguration { private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + @Getter + private Optional timeoutInSeconds = Optional.empty(); + + @Getter + private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + /** * Constructor. * @@ -1075,4 +1091,31 @@ public class KinesisClientLibConfiguration { this.shardPrioritization = shardPrioritization; return this; } + + /** + * Sets the size of the thread pool that will be used to renew leases. + * + * Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate. + * + * @param maxLeaseRenewalThreads + * the maximum size of the lease renewal thread pool + * @throws IllegalArgumentException + * if maxLeaseRenewalThreads is <= 0 + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRenewalThreads) { + Validate.isTrue(maxLeaseRenewalThreads > 2, + "The maximum number of lease renewal threads must be greater than or equal to 2."); + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + + return this; + } + + /** + * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for + */ + public void withTimeoutInSeconds(final int timeoutInSeconds) { + this.timeoutInSeconds = Optional.of(timeoutInSeconds); + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 5bd6ed98..448a2953 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -101,9 +101,10 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator { - - private static final Log log = LogFactory.getLog(ShutdownFuture.class); - - private final CountDownLatch shutdownCompleteLatch; - private final CountDownLatch notificationCompleteLatch; - private final Worker worker; - - ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) { - this.shutdownCompleteLatch = shutdownCompleteLatch; - this.notificationCompleteLatch = notificationCompleteLatch; - this.worker = worker; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException("Cannot cancel a shutdown process"); - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return isWorkerShutdownComplete(); - } - - private boolean isWorkerShutdownComplete() { - return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty(); - } - - private long outstandingRecordProcessors(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - - final long startNanos = System.nanoTime(); - - // - // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. - // There is the possibility of a race condition where a lease is terminated after the shutdown request - // notification is started, but before the ShardConsumer is sent the notification. In this case the - // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. - // - if (!notificationCompleteLatch.await(timeout, unit)) { - long awaitingNotification = notificationCompleteLatch.getCount(); - long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); - log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and " - + awaitingFinalShutdown + " awaiting final shutdown"); - if (awaitingFinalShutdown != 0) { - // - // The number of record processor awaiting final shutdown should be a superset of the those awaiting - // notification - // - return checkWorkerShutdownMiss(awaitingFinalShutdown); - } - } - - long remaining = remainingTimeout(timeout, unit, startNanos); - throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time."); - - // - // Once all record processors have been notified of the shutdown it is safe to allow the worker to - // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. - // - worker.shutdown(); - remaining = remainingTimeout(timeout, unit, startNanos); - throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time."); - - // - // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown - // processing. This should really be a no-op since as part of the notification completion the lease for - // ShardConsumer is terminated. - // - if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) { - long outstanding = shutdownCompleteLatch.getCount(); - log.info("Awaiting " + outstanding + " record processors to complete final shutdown"); - - return checkWorkerShutdownMiss(outstanding); - } - return 0; - } - - private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) { - long checkNanos = System.nanoTime() - startNanos; - return unit.toNanos(timeout) - checkNanos; - } - - private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException { - if (remainingNanos <= 0) { - throw new TimeoutException(message); - } - } - - /** - * This checks to see if the worker has already hit it's shutdown target, while there is outstanding record - * processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the - * latch should be decremented before the shutdown completion. - * - * @param outstanding - * the number of record processor still awaiting shutdown. - * @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already. - */ - private long checkWorkerShutdownMiss(long outstanding) { - if (isWorkerShutdownComplete()) { - if (outstanding != 0) { - log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding - + " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: " - + worker.isShutdownComplete() + " -- Consumer Map: " - + worker.getShardInfoShardConsumerMap().size()); - } - return 0; - } - return outstanding; - } - - @Override - public Void get() throws InterruptedException, ExecutionException { - boolean complete = false; - do { - try { - long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); - complete = outstanding == 0; - log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); - } catch (TimeoutException te) { - log.info("Timeout while waiting for completion: " + te.getMessage()); - } - - } while(!complete); - return null; - } - - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - long outstanding = outstandingRecordProcessors(timeout, unit); - if (outstanding != 0) { - throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown."); - } - return null; - } -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java index 928e6900..8fd492cf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java index a689ee43..11997367 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java index f88f131f..f80bdd29 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import lombok.Getter; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index bf9f4e7d..fd461e31 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -18,15 +18,18 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,14 +55,12 @@ import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * Worker is the high level class that Kinesis applications use to start - * processing data. It initializes and oversees different components (e.g. - * syncing shard and lease information, tracking shard assignments, and - * processing data from the shards). + * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees + * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from + * the shards). */ public class Worker implements Runnable { @@ -77,7 +78,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -96,17 +97,27 @@ public class Worker implements Runnable { // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. - private ConcurrentMap shardInfoShardConsumerMap = - new ConcurrentHashMap(); + private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; - + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; + /** + * Used to ensure that only one requestedShutdown is in progress at a time. + */ + private Future gracefulShutdownFuture; + @VisibleForTesting + protected boolean gracefuleShutdownStarted = false; + @VisibleForTesting + protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); + /** * Constructor. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, @@ -117,120 +128,128 @@ public class Worker implements Runnable { /** * Constructor. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - ExecutorService execService) { - this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration()), + KinesisClientLibConfiguration config, ExecutorService execService) { + this(recordProcessorFactory, config, + new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()), new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), - config.getCloudWatchClientConfiguration()), execService); + config.getCloudWatchClientConfiguration()), + execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param metricsFactory Metrics factory used to emit metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param metricsFactory + * Metrics factory used to emit metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - IMetricsFactory metricsFactory) { + KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) { this(recordProcessorFactory, config, metricsFactory, getExecutorService()); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration()), + KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) { + this(recordProcessorFactory, config, + new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), - config.getDynamoDBClientConfiguration()), metricsFactory, execService); + config.getDynamoDBClientConfiguration()), + metricsFactory, execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient) { this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService()); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, - AmazonCloudWatch cloudWatchClient, - ExecutorService execService) { - this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, - getMetricsFactory(cloudWatchClient, config), execService); + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, + AmazonCloudWatch cloudWatchClient, ExecutorService execService) { + this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, getMetricsFactory(cloudWatchClient, config), + execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this( - config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, + IMetricsFactory metricsFactory, ExecutorService execService) { + this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) - .getProxy(config.getStreamName()), + .getProxy(config.getStreamName()), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getInitialPositionInStreamExtended()), - config.getInitialPositionInStreamExtended(), - config.getParentShardPollIntervalMillis(), - config.getShardSyncIntervalMillis(), - config.shouldCleanupLeasesUponShardCompletion(), - null, + config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), + config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator( new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), config.getWorkerIdentifier(), @@ -238,6 +257,7 @@ public class Worker implements Runnable { config.getEpsilonMillis(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), + config.getMaxLeaseRenewalThreads(), metricsFactory) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), @@ -269,48 +289,50 @@ public class Worker implements Runnable { + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() + ". Amazon Kinesis endpoint will overwrite region name."); LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); - } else { + } else { LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); } } } /** - * @param applicationName Name of the Kinesis application - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param streamConfig Stream configuration - * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start - * fetching data from this location in the stream when an application starts up for the first time and - * there are no checkpoints. If there are checkpoints, we start from the checkpoint position. - * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done - * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards - * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in - * Kinesis) - * @param checkpoint Used to get/set checkpoints - * @param leaseCoordinator Lease coordinator (coordinates currently owned leases) - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) - * @param metricsFactory Metrics factory used to emit metrics - * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception - * @param shardPrioritization Provides prioritization logic to decide which available shards process first + * @param applicationName + * Name of the Kinesis application + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param streamConfig + * Stream configuration + * @param initialPositionInStream + * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from + * this location in the stream when an application starts up for the first time and there are no + * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param parentShardPollIntervalMillis + * Wait for this long between polls to check if parent shards are done + * @param shardSyncIdleTimeMillis + * Time between tasks to sync leases and Kinesis shards + * @param cleanupLeasesUponShardCompletion + * Clean up shards we've finished processing (don't wait till they expire in Kinesis) + * @param checkpoint + * Used to get/set checkpoints + * @param leaseCoordinator + * Lease coordinator (coordinates currently owned leases) + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) + * @param metricsFactory + * Metrics factory used to emit metrics + * @param taskBackoffTimeMillis + * Backoff period when tasks encounter an exception + * @param shardPrioritization + * Provides prioritization logic to decide which available shards process first */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, - IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, - long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, - boolean cleanupLeasesUponShardCompletion, - ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, - ExecutorService execService, - IMetricsFactory metricsFactory, - long taskBackoffTimeMillis, - long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ShardPrioritization shardPrioritization) { + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.streamConfig = streamConfig; @@ -322,16 +344,11 @@ public class Worker implements Runnable { this.executorService = execService; this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; - this.controlServer = - new ShardSyncTaskManager(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPositionInStream, - cleanupLeasesUponShardCompletion, - shardSyncIdleTimeMillis, - metricsFactory, - executorService); + this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, + executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -344,8 +361,7 @@ public class Worker implements Runnable { } /** - * Start consuming data from the stream, and pass it to the application - * record processors. + * Start consuming data from the stream, and pass it to the application record processors. */ public void run() { if (shutdown) { @@ -418,12 +434,8 @@ public class Worker implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); + ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); @@ -460,14 +472,13 @@ public class Worker implements Runnable { } /** - * NOTE: This method is internal/private to the Worker class. It has package - * access solely for testing. + * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example - * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. - * ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); - * ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); + * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo + * shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); ShardInfo + * shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); */ void cleanupShardConsumers(Set assignedShards) { for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) { @@ -507,8 +518,57 @@ public class Worker implements Runnable { } /** - * Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware}, - * of the impending shutdown. This gives the record processor a final chance to checkpoint. + * Starts the requestedShutdown process, and returns a future that can be used to track the process. + * + * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and + * indicates the process behavior + * + * @return a future that will be set once shutdown is completed. + */ + @Deprecated + public Future requestShutdown() { + + Future requestedShutdownFuture = startGracefulShutdown(); + + return new Future() { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return requestedShutdownFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return requestedShutdownFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return requestedShutdownFuture.isDone(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + requestedShutdownFuture.get(); + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + requestedShutdownFuture.get(timeout, unit); + return null; + } + }; + } + + /** + * Requests a graceful shutdown of the worker, notifying record processors, that implement + * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to + * checkpoint. + * + * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the + * previous future. * * It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is * lost after requesting shutdown, but before the notification is dispatched. @@ -532,48 +592,85 @@ public class Worker implements Runnable { *
  • Once the worker shutdown is complete, the returned future is completed.
  • * * - * - * - * @return a Future that will be set once the shutdown is complete. + * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown + * completed successfully. A false value indicates that a non-exception case caused the shutdown process to + * terminate early. */ - public Future requestShutdown() { - - // - // Stop accepting new leases. Once we do this we can be sure that - // no more leases will be acquired. - // - leaseCoordinator.stopLeaseTaker(); - - Collection leases = leaseCoordinator.getAssignments(); - if (leases == null || leases.isEmpty()) { - // - // If there are no leases notification is already completed, but we still need to shutdown the worker. - // - this.shutdown(); - return Futures.immediateFuture(null); - } - CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); - CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); - for (KinesisClientLease lease : leases) { - ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, - notificationCompleteLatch, shutdownCompleteLatch); - ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - if (consumer != null) { - consumer.notifyShutdownRequested(shutdownNotification); - } else { - // - // There is a race condition between retrieving the current assignments, and creating the - // notification. If the a lease is lost in between these two points, we explicitly decrement the - // notification latches to clear the shutdown. - // - notificationCompleteLatch.countDown(); - shutdownCompleteLatch.countDown(); + public Future startGracefulShutdown() { + synchronized (this) { + if (gracefulShutdownFuture == null) { + gracefulShutdownFuture = gracefulShutdownCoordinator + .startGracefulShutdown(createGracefulShutdownCallable()); } } + return gracefulShutdownFuture; + } - return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this); + /** + * Creates a callable that will execute the graceful shutdown process. This callable can be used to execute graceful + * shutdowns in your own executor, or execute the shutdown synchronously. + * + * @return a callable that run the graceful shutdown process. This may return a callable that return true if the + * graceful shutdown has already been completed. + * @throws IllegalStateException + * thrown by the callable if another callable has already started the shutdown process. + */ + public Callable createGracefulShutdownCallable() { + if (isShutdownComplete()) { + return () -> true; + } + Callable startShutdown = createWorkerShutdownCallable(); + return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown); + } + public boolean hasGracefulShutdownStarted() { + return gracefuleShutdownStarted; + } + + @VisibleForTesting + Callable createWorkerShutdownCallable() { + return () -> { + synchronized (this) { + if (this.gracefuleShutdownStarted) { + throw new IllegalStateException("Requested shutdown has already been started"); + } + this.gracefuleShutdownStarted = true; + } + // + // Stop accepting new leases. Once we do this we can be sure that + // no more leases will be acquired. + // + leaseCoordinator.stopLeaseTaker(); + + Collection leases = leaseCoordinator.getAssignments(); + if (leases == null || leases.isEmpty()) { + // + // If there are no leases notification is already completed, but we still need to shutdown the worker. + // + this.shutdown(); + return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED; + } + CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); + CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); + for (KinesisClientLease lease : leases) { + ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, + lease, notificationCompleteLatch, shutdownCompleteLatch); + ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); + ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + if (consumer != null) { + consumer.notifyShutdownRequested(shutdownNotification); + } else { + // + // There is a race condition between retrieving the current assignments, and creating the + // notification. If the a lease is lost in between these two points, we explicitly decrement the + // notification latches to clear the shutdown. + // + notificationCompleteLatch.countDown(); + shutdownCompleteLatch.countDown(); + } + } + return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this); + }; } boolean isShutdownComplete() { @@ -617,8 +714,8 @@ public class Worker implements Runnable { } /** - * Perform final shutdown related tasks for the worker including shutting down worker owned - * executor services, threads, etc. + * Perform final shutdown related tasks for the worker including shutting down worker owned executor services, + * threads, etc. */ private void finalShutdown() { LOG.info("Starting worker's final shutdown."); @@ -659,11 +756,12 @@ public class Worker implements Runnable { } /** - * NOTE: This method is internal/private to the Worker class. It has package - * access solely for testing. + * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * - * @param shardInfo Kinesis shard info - * @param factory RecordProcessor factory + * @param shardInfo + * Kinesis shard info + * @param factory + * RecordProcessor factory * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { @@ -687,15 +785,15 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); + executorService, metricsFactory, taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist); } /** - * Logger for suppressing too much INFO logging. To avoid too much logging - * information Worker will output logging at INFO level for a single pass - * through the main loop every minute. At DEBUG level it will output all - * INFO logs on every pass. + * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at + * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on + * every pass. */ private static class WorkerLog { @@ -751,90 +849,89 @@ public class Worker implements Runnable { // Backwards compatible constructors /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - AmazonCloudWatchClient cloudWatchClient) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, (AmazonCloudWatch) cloudWatchClient); } /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - AmazonCloudWatchClient cloudWatchClient, - ExecutorService execService) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, - (AmazonCloudWatch) cloudWatchClient, - execService); + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + (AmazonCloudWatch) cloudWatchClient, execService); } /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, - metricsFactory, - execService); + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + metricsFactory, execService); } /** * Given configuration, returns appropriate metrics factory. - * @param cloudWatchClient Amazon CloudWatch client - * @param config KinesisClientLibConfiguration + * + * @param cloudWatchClient + * Amazon CloudWatch client + * @param config + * KinesisClientLibConfiguration * @return Returns metrics factory based on the config. */ - private static IMetricsFactory getMetricsFactory( - AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) { + private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, + KinesisClientLibConfiguration config) { IMetricsFactory metricsFactory; if (config.getMetricsLevel() == MetricsLevel.NONE) { metricsFactory = new NullMetricsFactory(); @@ -844,12 +941,8 @@ public class Worker implements Runnable { cloudWatchClient.setRegion(region); LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); } - metricsFactory = new WorkerCWMetricsFactory( - cloudWatchClient, - config.getApplicationName(), - config.getMetricsBufferTimeMillis(), - config.getMetricsMaxQueueSize(), - config.getMetricsLevel(), + metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), + config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), config.getMetricsEnabledDimensions()); } return metricsFactory; @@ -857,6 +950,7 @@ public class Worker implements Runnable { /** * Returns default executor service that should be used by the worker. + * * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { @@ -865,26 +959,19 @@ public class Worker implements Runnable { } /** - * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance - * or not. + * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Visible and non-final only for testing. */ static class WorkerCWMetricsFactory extends CWMetricsFactory { - WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, - String namespace, - long bufferTimeMillis, - int maxQueueSize, - MetricsLevel metricsLevel, - Set metricsEnabledDimensions) { - super(cloudWatchClient, namespace, bufferTimeMillis, - maxQueueSize, metricsLevel, metricsEnabledDimensions); + WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, + int maxQueueSize, MetricsLevel metricsLevel, Set metricsEnabledDimensions) { + super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions); } } /** - * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance - * or not. + * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance or not. * Visible and non-final only for testing. */ static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { @@ -918,24 +1005,25 @@ public class Worker implements Runnable { } /** - * Provide a V1 - * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}. + * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor + * IRecordProcessor}. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards * @return A reference to this updated object so that method calls can be chained together. */ public Builder recordProcessorFactory( - com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory - recordProcessorFactory) { + com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory) { this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory); return this; } /** - * Provide a V2 - * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}. + * Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor + * IRecordProcessor}. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards * @return A reference to this updated object so that method calls can be chained together. */ public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) { @@ -946,7 +1034,8 @@ public class Worker implements Runnable { /** * Set the Worker config. * - * @param config Kinesis Client Library configuration + * @param config + * Kinesis Client Library configuration * @return A reference to this updated object so that method calls can be chained together. */ public Builder config(KinesisClientLibConfiguration config) { @@ -957,7 +1046,8 @@ public class Worker implements Runnable { /** * Set the Kinesis client. * - * @param kinesisClient Kinesis Client used for fetching data + * @param kinesisClient + * Kinesis Client used for fetching data * @return A reference to this updated object so that method calls can be chained together. */ public Builder kinesisClient(AmazonKinesis kinesisClient) { @@ -968,7 +1058,8 @@ public class Worker implements Runnable { /** * Set the DynamoDB client. * - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases * @return A reference to this updated object so that method calls can be chained together. */ public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { @@ -979,7 +1070,8 @@ public class Worker implements Runnable { /** * Set the Cloudwatch client. * - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param cloudWatchClient + * CloudWatch Client for publishing metrics * @return A reference to this updated object so that method calls can be chained together. */ public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) { @@ -990,7 +1082,8 @@ public class Worker implements Runnable { /** * Set the metrics factory. * - * @param metricsFactory Metrics factory used to emit metrics + * @param metricsFactory + * Metrics factory used to emit metrics * @return A reference to this updated object so that method calls can be chained together. */ public Builder metricsFactory(IMetricsFactory metricsFactory) { @@ -1001,7 +1094,8 @@ public class Worker implements Runnable { /** * Set the executor service for processing records. * - * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) * @return A reference to this updated object so that method calls can be chained together. */ public Builder execService(ExecutorService execService) { @@ -1035,8 +1129,7 @@ public class Worker implements Runnable { "Kinesis Client Library configuration needs to be provided to build Worker"); } if (recordProcessorFactory == null) { - throw new IllegalArgumentException( - "A Record Processor Factory needs to be provided to build Worker"); + throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker"); } if (execService == null) { @@ -1078,7 +1171,7 @@ public class Worker implements Runnable { + ". Amazon Kinesis endpoint will overwrite region name."); LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); - } else { + } else { LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); } } @@ -1089,6 +1182,7 @@ public class Worker implements Runnable { shardPrioritization = new ParentsFirstShardPrioritization(1); } + return new Worker(config.getApplicationName(), recordProcessorFactory, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), @@ -1110,6 +1204,7 @@ public class Worker implements Runnable { config.getEpsilonMillis(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), + config.getMaxLeaseRenewalThreads(), metricsFactory) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java index 87545b7e..a467ee57 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/Messages.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + // Generated by the protocol buffer compiler. DO NOT EDIT! // source: messages.proto diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java index 29d8a7be..4be5a092 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/NamedThreadFactory.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.utils; import java.util.concurrent.Executors; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index 76cc7e87..f1a87aaf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -19,15 +19,17 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.clientlibrary.utils.NamedThreadFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; @@ -40,6 +42,7 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns @@ -63,12 +66,10 @@ public class LeaseCoordinator { private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; - private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); - private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); - - // Package level access for testing. - static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; - + private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); + private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build(); private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; @@ -114,7 +115,8 @@ public class LeaseCoordinator { long epsilonMillis, IMetricsFactory metricsFactory) { this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, - DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); + DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, + KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory); } /** @@ -134,8 +136,9 @@ public class LeaseCoordinator { long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, + int maxLeaseRenewerThreadCount, IMetricsFactory metricsFactory) { - this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); + this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); this.leaseTaker = new LeaseTaker(leaseManager, workerIdentifier, leaseDurationMillis) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); @@ -366,6 +369,9 @@ public class LeaseCoordinator { * @return Executor service that should be used for lease renewal. */ private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { - return Executors.newFixedThreadPool(maximumPoolSize, LEASE_RENEWAL_THREAD_FACTORY); + int coreLeaseCount = Math.max(maximumPoolSize / 4, 2); + + return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS, + new LinkedTransferQueue(), LEASE_RENEWAL_THREAD_FACTORY); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java index f191eedc..01b3a27f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java @@ -100,7 +100,7 @@ public class MultiLangDaemonConfig { kinesisClientLibConfig = configurator.getConfiguration(properties); executorService = buildExecutorService(properties); - recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService); + recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig); LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream " + kinesisClientLibConfig.getStreamName() + " with executable " + executableName); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 67a97770..0376242a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -14,11 +14,9 @@ */ package com.amazonaws.services.kinesis.multilang; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; @@ -29,9 +27,14 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; - import lombok.extern.apachecommons.CommonsLog; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + /** * An implementation of the multi language protocol. */ @@ -41,10 +44,11 @@ class MultiLangProtocol { private MessageReader messageReader; private MessageWriter messageWriter; private final InitializationInput initializationInput; + private KinesisClientLibConfiguration configuration; /** * Constructor. - * + * * @param messageReader * A message reader. * @param messageWriter @@ -53,16 +57,17 @@ class MultiLangProtocol { * information about the shard this processor is starting to process */ MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, - InitializationInput initializationInput) { + InitializationInput initializationInput, KinesisClientLibConfiguration configuration) { this.messageReader = messageReader; this.messageWriter = messageWriter; this.initializationInput = initializationInput; + this.configuration = configuration; } /** * Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with * a {@link StatusMessage} on its STDOUT. - * + * * @return Whether or not this operation succeeded. */ boolean initialize() { @@ -77,7 +82,7 @@ class MultiLangProtocol { /** * Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond * with a {@link StatusMessage} on its STDOUT. - * + * * @param processRecordsInput * The records, and associated metadata, to process. * @return Whether or not this operation succeeded. @@ -90,7 +95,7 @@ class MultiLangProtocol { /** * Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a * {@link StatusMessage} on its STDOUT. - * + * * @param checkpointer A checkpointer. * @param reason Why this processor is being shutdown. * @return Whether or not this operation succeeded. @@ -119,7 +124,7 @@ class MultiLangProtocol { * all communications with the child process regarding checkpointing were successful. Note that whether or not the * checkpointing itself was successful is not the concern of this method. This method simply cares whether it was * able to successfully communicate the results of its attempts to checkpoint. - * + * * @param action * What action is being waited on. * @param checkpointer @@ -150,44 +155,75 @@ class MultiLangProtocol { /** * Waits for status message and verifies it against the expectation - * + * * @param action * What action is being waited on. * @param checkpointer * the original process records request * @return Whether or not this operation succeeded. */ - private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { - StatusMessage statusMessage = null; - while (statusMessage == null) { + boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { + Optional statusMessage = Optional.empty(); + while (!statusMessage.isPresent()) { Future future = this.messageReader.getNextMessageFromSTDOUT(); - try { - Message message = future.get(); - // Note that instanceof doubles as a check against a value being null - if (message instanceof CheckpointMessage) { - boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get(); - if (!checkpointWriteSucceeded) { - return false; - } - } else if (message instanceof StatusMessage) { - statusMessage = (StatusMessage) message; - } - } catch (InterruptedException e) { - log.error(String.format("Interrupted while waiting for %s message for shard %s", action, - initializationInput.getShardId())); - return false; - } catch (ExecutionException e) { - log.error(String.format("Failed to get status message for %s action for shard %s", action, - initializationInput.getShardId()), e); + Optional message = configuration.getTimeoutInSeconds() + .map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action)) + .orElse(futureMethod(future::get, action)); + + if (!message.isPresent()) { return false; } + + Optional checkpointFailed = message.filter(m -> m instanceof CheckpointMessage ) + .map(m -> (CheckpointMessage) m) + .flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint")) + .map(checkpointSuccess -> !checkpointSuccess); + + if (checkpointFailed.orElse(false)) { + return false; + } + + statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m ); } - return this.validateStatusMessage(statusMessage, action); + return this.validateStatusMessage(statusMessage.get(), action); + } + + private interface FutureMethod { + T get() throws InterruptedException, TimeoutException, ExecutionException; + } + + private Optional futureMethod(FutureMethod fm, String action) { + try { + return Optional.of(fm.get()); + } catch (InterruptedException e) { + log.error(String.format("Interrupted while waiting for %s message for shard %s", action, + initializationInput.getShardId()), e); + } catch (ExecutionException e) { + log.error(String.format("Failed to get status message for %s action for shard %s", action, + initializationInput.getShardId()), e); + } catch (TimeoutException e) { + log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...", + action, + initializationInput.getShardId()), + e); + haltJvm(1); + } + return Optional.empty(); + } + + /** + * This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM + * without calling the Shutdown hooks. + * + * @param exitStatus The exit status with which the JVM is to be halted. + */ + protected void haltJvm(int exitStatus) { + Runtime.getRuntime().halt(exitStatus); } /** * Utility for confirming that the status message is for the provided action. - * + * * @param statusMessage The status of the child process. * @param action The action that was being waited on. * @return Whether or not this operation succeeded. @@ -205,7 +241,7 @@ class MultiLangProtocol { * provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then * this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing * the attempt to write the result of this checkpoint attempt to the child process. - * + * * @param checkpointMessage A checkpoint message. * @param checkpointer A checkpointer. * @return Whether or not this operation succeeded. diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 2532771b..bbcf957b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -24,6 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateExcep import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,8 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti private MultiLangProtocol protocol; + private KinesisClientLibConfiguration configuration; + @Override public void initialize(InitializationInput initializationInput) { try { @@ -87,7 +90,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti // Submit the error reader for execution stderrReadTask = executorService.submit(readSTDERRTask); - protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput); + protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput, configuration); if (!protocol.initialize()) { throw new RuntimeException("Failed to initialize child process"); } @@ -173,9 +176,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti * An obejct mapper. */ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) { this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), - new DrainChildSTDERRTask()); + new DrainChildSTDERRTask(), configuration); } /** @@ -195,13 +198,16 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti * Error reader to read from child process's stderr */ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, - MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) { + MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask, + KinesisClientLibConfiguration configuration) { this.executorService = executorService; this.processBuilder = processBuilder; this.objectMapper = objectMapper; this.messageWriter = messageWriter; this.messageReader = messageReader; this.readSTDERRTask = readSTDERRTask; + this.configuration = configuration; + this.state = ProcessState.ACTIVE; } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java index e55217a6..e596abf2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.multilang; import java.util.concurrent.ExecutorService; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,12 +40,15 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory private final ExecutorService executorService; + private final KinesisClientLibConfiguration configuration; + /** * @param command The command that will do processing for this factory's record processors. * @param executorService An executor service to use while processing inputs and outputs of the child process. */ - public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) { - this(command, executorService, new ObjectMapper()); + public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, + KinesisClientLibConfiguration configuration) { + this(command, executorService, new ObjectMapper(), configuration); } /** @@ -52,11 +56,13 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory * @param executorService An executor service to use while processing inputs and outputs of the child process. * @param objectMapper An object mapper used to convert messages to json to be written to the child process */ - public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) { + public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper, + KinesisClientLibConfiguration configuration) { this.command = command; this.commandArray = command.split(COMMAND_DELIMETER_REGEX); this.executorService = executorService; this.objectMapper = objectMapper; + this.configuration = configuration; } @Override @@ -65,7 +71,8 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory /* * Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments. */ - return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper); + return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper, + this.configuration); } String[] getCommandArray() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index c0a778e9..31272379 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java new file mode 100644 index 00000000..c032bf0c --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -0,0 +1,322 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.verification.VerificationMode; + +@RunWith(MockitoJUnitRunner.class) +public class GracefulShutdownCoordinatorTest { + + @Mock + private CountDownLatch shutdownCompleteLatch; + @Mock + private CountDownLatch notificationCompleteLatch; + @Mock + private Worker worker; + @Mock + private Callable contextCallable; + @Mock + private ConcurrentMap shardInfoConsumerMap; + + @Test + public void testAllShutdownCompletedAlready() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(worker).shutdown(); + } + + @Test + public void testNotificationNotCompletedYet() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, false, true); + when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L); + mockLatchAwait(shutdownCompleteLatch, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(false, true); + mockShardInfoConsumerMap(1, 0); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + verify(notificationCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class)); + verify(notificationCompleteLatch).getCount(); + + verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(shutdownCompleteLatch, times(2)).getCount(); + + verify(worker).shutdown(); + } + + @Test + public void testShutdownNotCompletedYet() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, true); + mockLatchAwait(shutdownCompleteLatch, false, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(false, true); + mockShardInfoConsumerMap(1, 0); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(notificationCompleteLatch, never()).getCount(); + + verify(shutdownCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class)); + verify(shutdownCompleteLatch, times(2)).getCount(); + + verify(worker).shutdown(); + } + + @Test + public void testMultipleAttemptsForNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, false, false, true); + when(notificationCompleteLatch.getCount()).thenReturn(2L, 1L, 0L); + + mockLatchAwait(shutdownCompleteLatch, true); + when(shutdownCompleteLatch.getCount()).thenReturn(2L, 2L, 1L, 1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(false, false, false, true); + mockShardInfoConsumerMap(2, 1, 0); + + assertThat(requestedShutdownCallable.call(), equalTo(true)); + + verifyLatchAwait(notificationCompleteLatch, 3); + verify(notificationCompleteLatch, times(2)).getCount(); + + verifyLatchAwait(shutdownCompleteLatch, 1); + verify(shutdownCompleteLatch, times(4)).getCount(); + } + + @Test + public void testWorkerAlreadyShutdownAtNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, false, true); + when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L); + + mockLatchAwait(shutdownCompleteLatch, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L); + + when(worker.isShutdownComplete()).thenReturn(true); + mockShardInfoConsumerMap(0); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch).getCount(); + + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(shutdownCompleteLatch, times(3)).getCount(); + } + + @Test + public void testWorkerAlreadyShutdownAtComplete() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + mockLatchAwait(notificationCompleteLatch, true); + + mockLatchAwait(shutdownCompleteLatch, false, true); + when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 1L); + + when(worker.isShutdownComplete()).thenReturn(true); + mockShardInfoConsumerMap(0); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch, never()).getCount(); + + verifyLatchAwait(shutdownCompleteLatch); + verify(shutdownCompleteLatch, times(3)).getCount(); + } + + @Test + public void testNotificationInterrupted() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + when(notificationCompleteLatch.getCount()).thenReturn(1L); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(worker, never()).shutdown(); + } + + @Test + public void testShutdownInterrupted() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch); + verify(worker).shutdown(); + } + + @Test + public void testInterruptedAfterNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + return true; + }); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(worker, never()).shutdown(); + } + + @Test + public void testInterruptedAfterWorkerShutdown() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + doAnswer(invocation -> { + Thread.currentThread().interrupt(); + return true; + }).when(worker).shutdown(); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(worker).shutdown(); + } + + @Test + public void testInterruptedDuringNotification() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + return false; + }); + when(notificationCompleteLatch.getCount()).thenReturn(1L); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch).getCount(); + + verifyLatchAwait(shutdownCompleteLatch, never()); + verify(shutdownCompleteLatch).getCount(); + + verify(worker, never()).shutdown(); + } + + @Test + public void testInterruptedDuringShutdown() throws Exception { + Callable requestedShutdownCallable = buildRequestedShutdownCallable(); + + when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true); + + when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + return false; + }); + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + assertThat(requestedShutdownCallable.call(), equalTo(false)); + verifyLatchAwait(notificationCompleteLatch); + verify(notificationCompleteLatch, never()).getCount(); + + verifyLatchAwait(shutdownCompleteLatch); + verify(shutdownCompleteLatch).getCount(); + + verify(worker).shutdown(); + } + + @Test(expected = IllegalStateException.class) + public void testWorkerShutdownCallableThrows() throws Exception { + Callable requestedShutdownCallable = new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable); + when(contextCallable.call()).thenThrow(new IllegalStateException("Bad Shutdown")); + + requestedShutdownCallable.call(); + } + + private void verifyLatchAwait(CountDownLatch latch) throws Exception { + verifyLatchAwait(latch, times(1)); + } + + private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { + verifyLatchAwait(latch, times(times)); + } + + private void verifyLatchAwait(CountDownLatch latch, VerificationMode verificationMode) throws Exception { + verify(latch, verificationMode).await(anyLong(), any(TimeUnit.class)); + } + + private void mockLatchAwait(CountDownLatch latch, Boolean initial, Boolean... remaining) throws Exception { + when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, remaining); + } + + private Callable buildRequestedShutdownCallable() throws Exception { + GracefulShutdownContext context = new GracefulShutdownContext(shutdownCompleteLatch, + notificationCompleteLatch, worker); + when(contextCallable.call()).thenReturn(context); + return new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable); + } + + private void mockShardInfoConsumerMap(Integer initialItemCount, Integer... additionalItemCounts) { + when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); + Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length]; + for (int i = 0; i < additionalItemCounts.length; ++i) { + additionalEmptyStates[i] = additionalItemCounts[i] == 0; + } + when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts); + when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates); + } + +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java index 7ba0753d..42fd82de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.junit.Assert.assertEquals; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java deleted file mode 100644 index cccbc9a1..00000000 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java +++ /dev/null @@ -1,236 +0,0 @@ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; -import org.mockito.stubbing.OngoingStubbing; - -@RunWith(MockitoJUnitRunner.class) -public class ShutdownFutureTest { - - @Mock - private CountDownLatch shutdownCompleteLatch; - @Mock - private CountDownLatch notificationCompleteLatch; - @Mock - private Worker worker; - @Mock - private ConcurrentMap shardInfoConsumerMap; - - @Test - public void testSimpleGetAlreadyCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - - mockNotificationComplete(true); - mockShutdownComplete(true); - - future.get(); - - verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); - verify(worker).shutdown(); - verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); - } - - @Test - public void testNotificationNotCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - - mockNotificationComplete(false, true); - mockShutdownComplete(true); - - when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); - when(shardInfoConsumerMap.isEmpty()).thenReturn(false); - when(worker.isShutdownComplete()).thenReturn(false); - - when(notificationCompleteLatch.getCount()).thenReturn(1L); - when(shutdownCompleteLatch.getCount()).thenReturn(1L); - - expectedTimeoutException(future); - - verify(worker, never()).shutdown(); - - awaitFuture(future); - - verify(notificationCompleteLatch).getCount(); - verifyLatchAwait(notificationCompleteLatch, 2); - - verify(shutdownCompleteLatch).getCount(); - verifyLatchAwait(shutdownCompleteLatch); - - verify(worker).shutdown(); - - } - - @Test - public void testShutdownNotCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - mockNotificationComplete(true); - mockShutdownComplete(false, true); - - when(shutdownCompleteLatch.getCount()).thenReturn(1L); - when(worker.isShutdownComplete()).thenReturn(false); - - mockShardInfoConsumerMap(1); - - expectedTimeoutException(future); - verify(worker).shutdown(); - awaitFuture(future); - - verifyLatchAwait(notificationCompleteLatch, 2); - verifyLatchAwait(shutdownCompleteLatch, 2); - - verify(worker).isShutdownComplete(); - verify(worker).getShardInfoShardConsumerMap(); - - } - - @Test - public void testShutdownNotCompleteButWorkerShutdown() throws Exception { - ShutdownFuture future = create(); - - mockNotificationComplete(true); - mockShutdownComplete(false); - - when(shutdownCompleteLatch.getCount()).thenReturn(1L); - when(worker.isShutdownComplete()).thenReturn(true); - mockShardInfoConsumerMap(1); - - awaitFuture(future); - verify(worker).shutdown(); - verifyLatchAwait(notificationCompleteLatch); - verifyLatchAwait(shutdownCompleteLatch); - - verify(worker, times(2)).isShutdownComplete(); - verify(worker).getShardInfoShardConsumerMap(); - verify(shardInfoConsumerMap).size(); - } - - @Test - public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception { - ShutdownFuture future = create(); - mockNotificationComplete(true); - mockShutdownComplete(false); - - mockOutstanding(shutdownCompleteLatch, 1L); - - when(worker.isShutdownComplete()).thenReturn(false); - mockShardInfoConsumerMap(0); - - awaitFuture(future); - verify(worker).shutdown(); - verifyLatchAwait(notificationCompleteLatch); - verifyLatchAwait(shutdownCompleteLatch); - - verify(worker, times(2)).isShutdownComplete(); - verify(worker, times(2)).getShardInfoShardConsumerMap(); - - verify(shardInfoConsumerMap).isEmpty(); - verify(shardInfoConsumerMap).size(); - } - - @Test - public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception { - ShutdownFuture future = create(); - mockNotificationComplete(false); - mockShutdownComplete(false); - - mockOutstanding(notificationCompleteLatch, 1L); - mockOutstanding(shutdownCompleteLatch, 1L); - - when(worker.isShutdownComplete()).thenReturn(false); - mockShardInfoConsumerMap(0); - - awaitFuture(future); - verify(worker, never()).shutdown(); - verifyLatchAwait(notificationCompleteLatch); - verify(shutdownCompleteLatch, never()).await(); - - verify(worker, times(2)).isShutdownComplete(); - verify(worker, times(2)).getShardInfoShardConsumerMap(); - - verify(shardInfoConsumerMap).isEmpty(); - verify(shardInfoConsumerMap).size(); - } - - @Test(expected = TimeoutException.class) - public void testTimeExceededException() throws Exception { - ShutdownFuture future = create(); - mockNotificationComplete(false); - mockOutstanding(notificationCompleteLatch, 1L); - when(worker.isShutdownComplete()).thenReturn(false); - mockShardInfoConsumerMap(1); - - future.get(1, TimeUnit.NANOSECONDS); - } - - private ShutdownFuture create() { - return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); - } - - private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) { - when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); - Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length]; - for(int i = 0; i < additionalItemCounts.length; ++i) { - additionalEmptyStates[i] = additionalItemCounts[i] == 0; - } - when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts); - when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates); - } - - private void verifyLatchAwait(CountDownLatch latch) throws Exception { - verifyLatchAwait(latch, 1); - } - - private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { - verify(latch, times(times)).await(anyLong(), any(TimeUnit.class)); - } - - private void expectedTimeoutException(ShutdownFuture future) throws Exception { - boolean gotTimeout = false; - try { - awaitFuture(future); - } catch (TimeoutException te) { - gotTimeout = true; - } - assertThat("Expected a timeout exception to occur", gotTimeout); - } - - private void awaitFuture(ShutdownFuture future) throws Exception { - future.get(1, TimeUnit.SECONDS); - } - - private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception { - mockLatch(notificationCompleteLatch, initial, states); - - } - - private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception { - mockLatch(shutdownCompleteLatch, initial, states); - } - - private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception { - when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states); - } - - private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception { - when(latch.getCount()).thenReturn(remaining, additionalRemaining); - } - -} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java index d0645229..79118ac9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.mockito.Matchers.any; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index daf58165..5913bf0d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -762,7 +762,7 @@ public class WorkerTest { verify(executorService, atLeastOnce()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); worker.runProcessLoop(); verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) @@ -781,6 +781,146 @@ public class WorkerTest { } + @Test(expected = IllegalStateException.class) + public void testShutdownCallableNotAllowedTwice() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + @Override + void postConstruct() { + this.gracefuleShutdownStarted = true; + } + }; + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + assertThat(worker.hasGracefulShutdownStarted(), equalTo(true)); + worker.createWorkerShutdownCallable().call(); + + } + + @Test + public void testGracefulShutdownSingleFuture() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + GracefulShutdownCoordinator coordinator = mock(GracefulShutdownCoordinator.class); + when(coordinator.createGracefulShutdownCallable(any(Callable.class))).thenReturn(() -> true); + + Future gracefulShutdownFuture = mock(Future.class); + + when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); + + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + @Override + void postConstruct() { + this.gracefulShutdownCoordinator = coordinator; + } + }; + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + Future firstFuture = worker.startGracefulShutdown(); + Future secondFuture = worker.startGracefulShutdown(); + + assertThat(firstFuture, equalTo(secondFuture)); + verify(coordinator).startGracefulShutdown(any(Callable.class)); + + } + @Test public void testRequestShutdownNoLeases() throws Exception { @@ -830,7 +970,7 @@ public class WorkerTest { verify(executorService, never()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); worker.runProcessLoop(); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) @@ -909,7 +1049,7 @@ public class WorkerTest { .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2))))); worker.getShardInfoShardConsumerMap().remove(shardInfo2); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); leases.remove(1); currentAssignments.remove(1); worker.runProcessLoop(); @@ -1194,6 +1334,24 @@ public class WorkerTest { } + private abstract class InjectableWorker extends Worker { + InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, + boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { + super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, + parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, + checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, + failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); + postConstruct(); + } + + abstract void postConstruct(); + } + private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) { return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID()) .withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java index db0e3d0c..2b7aa0b7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.proxies; import static org.hamcrest.Matchers.both; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java index 011e0721..0b9a72f1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.clientlibrary.types; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java index 9bfea4d3..2e8879fe 100644 --- a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.leases.impl; import java.util.HashSet; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java index b84d61a0..92ac15f7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java @@ -1,3 +1,17 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.amazonaws.services.kinesis.multilang; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index e1827799..3f35b8fa 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -14,64 +14,78 @@ */ package com.amazonaws.services.kinesis.multilang; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; - +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; - import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.google.common.util.concurrent.SettableFuture; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class MultiLangProtocolTest { private static final List EMPTY_RECORD_LIST = Collections.emptyList(); + @Mock private MultiLangProtocol protocol; + @Mock private MessageWriter messageWriter; + @Mock private MessageReader messageReader; private String shardId; + @Mock private IRecordProcessorCheckpointer checkpointer; + @Mock + private KinesisClientLibConfiguration configuration; @Before public void setup() { this.shardId = "shard-id-123"; - messageWriter = Mockito.mock(MessageWriter.class); - messageReader = Mockito.mock(MessageReader.class); - protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId)); - checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); + protocol = new MultiLangProtocolForTesting(messageReader, messageWriter, + new InitializationInput().withShardId(shardId), configuration); + + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty()); } private Future buildFuture(T value) { @@ -165,7 +179,10 @@ public class MultiLangProtocolTest { this.add(new StatusMessage("processRecords")); } })); - assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true)); + + boolean result = protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)); + + assertThat(result, equalTo(true)); verify(checkpointer, timeout(1)).checkpoint(); verify(checkpointer, timeout(1)).checkpoint("123", 0L); @@ -183,4 +200,50 @@ public class MultiLangProtocolTest { })); assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false)); } + + @Test(expected = NullPointerException.class) + public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + Future future = Mockito.mock(Future.class); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); + when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class); + protocol = new MultiLangProtocolForTesting(messageReader, + messageWriter, + new InitializationInput().withShardId(shardId), + configuration); + + protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)); + } + + @Test + public void waitForStatusMessageSuccessTest() { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); + + assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST))); + } + + private class MultiLangProtocolForTesting extends MultiLangProtocol { + /** + * Constructor. + * + * @param messageReader A message reader. + * @param messageWriter A message writer. + * @param initializationInput + * @param configuration + */ + MultiLangProtocolForTesting(final MessageReader messageReader, + final MessageWriter messageWriter, + final InitializationInput initializationInput, + final KinesisClientLibConfiguration configuration) { + super(messageReader, messageWriter, initializationInput, configuration); + } + + @Override + protected void haltJvm(final int exitStatus) { + throw new NullPointerException(); + } + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java index a8f5885b..aa6aceea 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java @@ -14,16 +14,24 @@ */ package com.amazonaws.services.kinesis.multilang; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.junit.Assert; import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class StreamingRecordProcessorFactoryTest { + @Mock + private KinesisClientLibConfiguration configuration; + @Test public void createProcessorTest() { - MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null); + MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null, configuration); IRecordProcessor processor = factory.createProcessor(); Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class, diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 3541be25..d2659349 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -14,42 +14,14 @@ */ package com.amazonaws.services.kinesis.multilang; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; @@ -60,6 +32,35 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class StreamingRecordProcessorTest { @@ -137,6 +138,9 @@ public class StreamingRecordProcessorTest { private MultiLangRecordProcessor recordProcessor; + @Mock + private KinesisClientLibConfiguration configuration; + @Before public void prepare() throws IOException, InterruptedException, ExecutionException { // Fake command @@ -150,10 +154,11 @@ public class StreamingRecordProcessorTest { messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); errorReader = Mockito.mock(DrainChildSTDERRTask.class); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty()); recordProcessor = new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter, - messageReader, errorReader) { + messageReader, errorReader, configuration) { // Just don't do anything when we exit. void exit() {