merge with upstream

This commit is contained in:
Walid Baruni 2017-08-03 12:08:48 -07:00
commit 061e84c7dd
40 changed files with 1650 additions and 833 deletions

View file

@ -1,6 +1,6 @@
language: java language: java
jdk: jdk:
- openjdk7 - openjdk8
- oraclejdk7
- oraclejdk8 - oraclejdk8
sudo: false sudo: false
dist: trusty

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.7.6 Bundle-Version: 1.8.1
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6", Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -23,12 +23,31 @@ The **Amazon Kinesis Client Library for Java** (Amazon KCL) enables Java develop
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use this command: `mvn clean install -Dgpg.skip=true` After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use this command: `mvn clean install -Dgpg.skip=true`
## Integration with the Kinesis Producer Library ## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort.  When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
## Amazon KCL support for other languages ## Amazon KCL support for other languages
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes ## Release Notes
### Release 1.8.1 (August 2, 2017)
* Support timeouts for calls to the MultiLang Daemon
This adds support for setting a timeout when dispatching records to the client record processor. If the record processor doesn't respond within the timeout the parent Java process will be terminated. This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor.
The timeout for the this can be set by adding `timeoutInSeconds = <timeout value>`. The default for this is no timeout.
__Setting this can cause the KCL to exit suddenly, before using this ensure that you have an automated restart for your application__
* [PR #195](https://github.com/awslabs/amazon-kinesis-client/pull/195)
* [Issue #185](https://github.com/awslabs/amazon-kinesis-client/issues/185)
### Release 1.8.0 (July 25, 2017)
* Execute graceful shutdown on its own thread
* [PR #191](https://github.com/awslabs/amazon-kinesis-client/pull/191)
* [Issue #167](https://github.com/awslabs/amazon-kinesis-client/issues/167)
* Added support for controlling the size of the lease renewer thread pool
* [PR #177](https://github.com/awslabs/amazon-kinesis-client/pull/177)
* [Issue #171](https://github.com/awslabs/amazon-kinesis-client/issues/171)
* Require Java 8 and later
__Java 8 is now required for versions 1.8.0 of the amazon-kinesis-client and later.__
* [PR #176](https://github.com/awslabs/amazon-kinesis-client/issues/176)
### Release 1.7.6 (June 21, 2017) ### Release 1.7.6 (June 21, 2017)
* Added support for graceful shutdown in MultiLang Clients * Added support for graceful shutdown in MultiLang Clients
* [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174) * [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174)

View file

@ -1,10 +0,0 @@
source.. = src/main/java,\
src/main/resources
output.. = bin/
bin.includes = LICENSE.txt,\
NOTICE.txt,\
META-INF/,\
.
jre.compilation.profile = JavaSE-1.7

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.7.6</version> <version>1.8.1</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>
@ -25,7 +25,7 @@
</licenses> </licenses>
<properties> <properties>
<aws-java-sdk.version>1.11.151</aws-java-sdk.version> <aws-java-sdk.version>1.11.171</aws-java-sdk.version>
<sqlite4java.version>1.0.392</sqlite4java.version> <sqlite4java.version>1.0.392</sqlite4java.version>
<sqlite4java.native>libsqlite4java</sqlite4java.native> <sqlite4java.native>libsqlite4java</sqlite4java.native>
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath> <sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
@ -131,8 +131,8 @@
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version> <version>3.2</version>
<configuration> <configuration>
<source>1.7</source> <source>1.8</source>
<target>1.7</target> <target>1.8</target>
<encoding>UTF-8</encoding> <encoding>UTF-8</encoding>
</configuration> </configuration>
</plugin> </plugin>

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2; package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/** /**

View file

@ -0,0 +1,33 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Data;
import java.util.concurrent.CountDownLatch;
@Data
class GracefulShutdownContext {
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
static GracefulShutdownContext SHUTDOWN_ALREADY_COMPLETED = new GracefulShutdownContext(null, null, null);
boolean isShutdownAlreadyCompleted() {
return shutdownCompleteLatch == null && notificationCompleteLatch == null && worker == null;
}
}

View file

@ -0,0 +1,163 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
class GracefulShutdownCoordinator {
Future<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) {
FutureTask<Boolean> task = new FutureTask<>(shutdownCallable);
Thread shutdownThread = new Thread(task, "RequestedShutdownThread");
shutdownThread.start();
return task;
}
Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
return new GracefulShutdownCallable(startWorkerShutdown);
}
static class GracefulShutdownCallable implements Callable<Boolean> {
private static final Log log = LogFactory.getLog(GracefulShutdownCallable.class);
private final Callable<GracefulShutdownContext> startWorkerShutdown;
GracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
this.startWorkerShutdown = startWorkerShutdown;
}
private boolean isWorkerShutdownComplete(GracefulShutdownContext context) {
return context.getWorker().isShutdownComplete() || context.getWorker().getShardInfoShardConsumerMap().isEmpty();
}
private String awaitingLogMessage(GracefulShutdownContext context) {
long awaitingNotification = context.getNotificationCompleteLatch().getCount();
long awaitingFinalShutdown = context.getShutdownCompleteLatch().getCount();
return String.format(
"Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ",
awaitingNotification, awaitingFinalShutdown);
}
private String awaitingFinalShutdownMessage(GracefulShutdownContext context) {
long outstanding = context.getShutdownCompleteLatch().getCount();
return String.format("Waiting for %d record processors to complete final shutdown", outstanding);
}
private boolean waitForRecordProcessors(GracefulShutdownContext context) {
//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
// There is the possibility of a race condition where a lease is terminated after the shutdown request
// notification is started, but before the ShardConsumer is sent the notification. In this case the
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
//
try {
while (!context.getNotificationCompleteLatch().await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
log.info(awaitingLogMessage(context));
if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) {
return false;
}
}
} catch (InterruptedException ie) {
log.warn("Interrupted while waiting for notification complete, terminating shutdown. "
+ awaitingLogMessage(context));
return false;
}
if (Thread.interrupted()) {
log.warn("Interrupted before worker shutdown, terminating shutdown");
return false;
}
//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
context.getWorker().shutdown();
if (Thread.interrupted()) {
log.warn("Interrupted after worker shutdown, terminating shutdown");
return false;
}
//
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
try {
while (!context.getShutdownCompleteLatch().await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
log.info(awaitingFinalShutdownMessage(context));
if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) {
return false;
}
}
} catch (InterruptedException ie) {
log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. "
+ awaitingFinalShutdownMessage(context));
return false;
}
return true;
}
/**
* This checks to see if the worker has already hit it's shutdown target, while there is outstanding record
* processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the
* latch should be decremented before the shutdown completion.
*
* @param outstanding
* the number of record processor still awaiting shutdown.
*/
private boolean workerShutdownWithRemaining(long outstanding, GracefulShutdownContext context) {
if (isWorkerShutdownComplete(context)) {
if (outstanding != 0) {
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + context.getShutdownCompleteLatch().getCount() + ". shutdownComplete: "
+ context.getWorker().isShutdownComplete() + " -- Consumer Map: "
+ context.getWorker().getShardInfoShardConsumerMap().size());
return true;
}
}
return false;
}
@Override
public Boolean call() throws Exception {
GracefulShutdownContext context;
try {
context = startWorkerShutdown.call();
} catch (Exception ex) {
log.warn("Caught exception while requesting initial worker shutdown.", ex);
throw ex;
}
return context.isShutdownAlreadyCompleted() || waitForRecordProcessors(context);
}
}
}

View file

@ -15,8 +15,11 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Date; import java.util.Date;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.Validate;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.RegionUtils;
@ -25,6 +28,8 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import lombok.Getter;
/** /**
* Configuration for the Amazon Kinesis Client Library. * Configuration for the Amazon Kinesis Client Library.
*/ */
@ -121,7 +126,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.6"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.1";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
@ -155,10 +160,10 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
/* /**
* The Worker will skip shard sync during initialization if there are one or more leases in the lease table. * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
* This assumes that the shards and leases are in-sync. * assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application). * during incremental deployments of an application).
*/ */
public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false;
@ -167,6 +172,11 @@ public class KinesisClientLibConfiguration {
*/ */
public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization();
/**
* The size of the thread pool to create for the lease renewer to use.
*/
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -204,6 +214,12 @@ public class KinesisClientLibConfiguration {
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();
@Getter
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
/** /**
* Constructor. * Constructor.
* *
@ -1075,4 +1091,31 @@ public class KinesisClientLibConfiguration {
this.shardPrioritization = shardPrioritization; this.shardPrioritization = shardPrioritization;
return this; return this;
} }
/**
* Sets the size of the thread pool that will be used to renew leases.
*
* Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
*
* @param maxLeaseRenewalThreads
* the maximum size of the lease renewal thread pool
* @throws IllegalArgumentException
* if maxLeaseRenewalThreads is <= 0
* @return this configuration object
*/
public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRenewalThreads) {
Validate.isTrue(maxLeaseRenewalThreads > 2,
"The maximum number of lease renewal threads must be greater than or equal to 2.");
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
return this;
}
/**
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
*/
public void withTimeoutInSeconds(final int timeoutInSeconds) {
this.timeoutInSeconds = Optional.of(timeoutInSeconds);
}
} }

View file

@ -101,9 +101,10 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
long epsilonMillis, long epsilonMillis,
int maxLeasesForWorker, int maxLeasesForWorker,
int maxLeasesToStealAtOneTime, int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, super(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, metricsFactory); maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory);
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
} }

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List; import java.util.List;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.ArrayList; import java.util.ArrayList;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List; import java.util.List;

View file

@ -1,155 +0,0 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete.
*/
class ShutdownFuture implements Future<Void> {
private static final Log log = LogFactory.getLog(ShutdownFuture.class);
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) {
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
this.worker = worker;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("Cannot cancel a shutdown process");
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isWorkerShutdownComplete();
}
private boolean isWorkerShutdownComplete() {
return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty();
}
private long outstandingRecordProcessors(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
final long startNanos = System.nanoTime();
//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
// There is the possibility of a race condition where a lease is terminated after the shutdown request
// notification is started, but before the ShardConsumer is sent the notification. In this case the
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
//
if (!notificationCompleteLatch.await(timeout, unit)) {
long awaitingNotification = notificationCompleteLatch.getCount();
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and "
+ awaitingFinalShutdown + " awaiting final shutdown");
if (awaitingFinalShutdown != 0) {
//
// The number of record processor awaiting final shutdown should be a superset of the those awaiting
// notification
//
return checkWorkerShutdownMiss(awaitingFinalShutdown);
}
}
long remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time.");
//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
worker.shutdown();
remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time.");
//
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) {
long outstanding = shutdownCompleteLatch.getCount();
log.info("Awaiting " + outstanding + " record processors to complete final shutdown");
return checkWorkerShutdownMiss(outstanding);
}
return 0;
}
private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) {
long checkNanos = System.nanoTime() - startNanos;
return unit.toNanos(timeout) - checkNanos;
}
private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException {
if (remainingNanos <= 0) {
throw new TimeoutException(message);
}
}
/**
* This checks to see if the worker has already hit it's shutdown target, while there is outstanding record
* processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the
* latch should be decremented before the shutdown completion.
*
* @param outstanding
* the number of record processor still awaiting shutdown.
* @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already.
*/
private long checkWorkerShutdownMiss(long outstanding) {
if (isWorkerShutdownComplete()) {
if (outstanding != 0) {
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: "
+ worker.isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size());
}
return 0;
}
return outstanding;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
boolean complete = false;
do {
try {
long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
complete = outstanding == 0;
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
} catch (TimeoutException te) {
log.info("Timeout while waiting for completion: " + te.getMessage());
}
} while(!complete);
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long outstanding = outstandingRecordProcessors(timeout, unit);
if (outstanding != 0) {
throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown.");
}
return null;
}
}

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Getter; import lombok.Getter;

View file

@ -1,16 +1,16 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
* A copy of the License is located at * A copy of the License is located at
* *
* http://aws.amazon.com/asl/ * http://aws.amazon.com/asl/
* *
* or in the "license" file accompanying this file. This file is distributed * or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
@ -18,15 +18,18 @@ import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -52,14 +55,12 @@ import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Worker is the high level class that Kinesis applications use to start * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
* processing data. It initializes and oversees different components (e.g. * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
* syncing shard and lease information, tracking shard assignments, and * the shards).
* processing data from the shards).
*/ */
public class Worker implements Runnable { public class Worker implements Runnable {
@ -77,7 +78,7 @@ public class Worker implements Runnable {
private final long idleTimeInMilliseconds; private final long idleTimeInMilliseconds;
// Backoff time when polling to check if application has finished processing // Backoff time when polling to check if application has finished processing
// parent shards // parent shards
private final long parentShardPollIntervalMillis; private final long parentShardPollIntervalMillis;
private final ExecutorService executorService; private final ExecutorService executorService;
private final IMetricsFactory metricsFactory; private final IMetricsFactory metricsFactory;
// Backoff time when running tasks if they encounter exceptions // Backoff time when running tasks if they encounter exceptions
@ -96,17 +97,27 @@ public class Worker implements Runnable {
// Holds consumers for shards the worker is currently tracking. Key is shard // Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer. // info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
/**
* Used to ensure that only one requestedShutdown is in progress at a time.
*/
private Future<Boolean> gracefulShutdownFuture;
@VisibleForTesting
protected boolean gracefuleShutdownStarted = false;
@VisibleForTesting
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
/** /**
* Constructor. * Constructor.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
@ -117,120 +128,128 @@ public class Worker implements Runnable {
/** /**
* Constructor. * Constructor.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param execService ExecutorService to use for processing records (support for multi-threaded * @param config
* consumption) * Kinesis Client Library configuration
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, ExecutorService execService) {
ExecutorService execService) { this(recordProcessorFactory, config,
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration()), config.getDynamoDBClientConfiguration()),
new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
config.getCloudWatchClientConfiguration()), execService); config.getCloudWatchClientConfiguration()),
execService);
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param metricsFactory Metrics factory used to emit metrics * @param config
* Kinesis Client Library configuration
* @param metricsFactory
* Metrics factory used to emit metrics
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) {
IMetricsFactory metricsFactory) {
this(recordProcessorFactory, config, metricsFactory, getExecutorService()); this(recordProcessorFactory, config, metricsFactory, getExecutorService());
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param metricsFactory Metrics factory used to emit metrics * @param config
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client Library configuration
* consumption) * @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) {
IMetricsFactory metricsFactory, this(recordProcessorFactory, config,
ExecutorService execService) { new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration()), metricsFactory, execService); config.getDynamoDBClientConfiguration()),
metricsFactory, execService);
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
AmazonKinesis kinesisClient,
AmazonDynamoDB dynamoDBClient,
AmazonCloudWatch cloudWatchClient) { AmazonCloudWatch cloudWatchClient) {
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService()); this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService());
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
AmazonKinesis kinesisClient, AmazonCloudWatch cloudWatchClient, ExecutorService execService) {
AmazonDynamoDB dynamoDBClient, this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, getMetricsFactory(cloudWatchClient, config),
AmazonCloudWatch cloudWatchClient, execService);
ExecutorService execService) {
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient,
getMetricsFactory(cloudWatchClient, config), execService);
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param metricsFactory Metrics factory used to emit metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
AmazonKinesis kinesisClient, IMetricsFactory metricsFactory, ExecutorService execService) {
AmazonDynamoDB dynamoDBClient, this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
IMetricsFactory metricsFactory,
ExecutorService execService) {
this(
config.getApplicationName(),
new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
new StreamConfig( new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName()), .getProxy(config.getStreamName()),
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(), config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStreamExtended()), config.getInitialPositionInStreamExtended()),
config.getInitialPositionInStreamExtended(), config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator( new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
config.getWorkerIdentifier(), config.getWorkerIdentifier(),
@ -238,6 +257,7 @@ public class Worker implements Runnable {
config.getEpsilonMillis(), config.getEpsilonMillis(),
config.getMaxLeasesForWorker(), config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory) metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
@ -269,48 +289,50 @@ public class Worker implements Runnable {
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
+ ". Amazon Kinesis endpoint will overwrite region name."); + ". Amazon Kinesis endpoint will overwrite region name.");
LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint());
} else { } else {
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
} }
} }
} }
/** /**
* @param applicationName Name of the Kinesis application * @param applicationName
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * Name of the Kinesis application
* @param streamConfig Stream configuration * @param recordProcessorFactory
* @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start * Used to get record processor instances for processing data from shards
* fetching data from this location in the stream when an application starts up for the first time and * @param streamConfig
* there are no checkpoints. If there are checkpoints, we start from the checkpoint position. * Stream configuration
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done * @param initialPositionInStream
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in * this location in the stream when an application starts up for the first time and there are no
* Kinesis) * checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param checkpoint Used to get/set checkpoints * @param parentShardPollIntervalMillis
* @param leaseCoordinator Lease coordinator (coordinates currently owned leases) * Wait for this long between polls to check if parent shards are done
* @param execService ExecutorService to use for processing records (support for multi-threaded * @param shardSyncIdleTimeMillis
* consumption) * Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory used to emit metrics * @param cleanupLeasesUponShardCompletion
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception * Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param shardPrioritization Provides prioritization logic to decide which available shards process first * @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
* Lease coordinator (coordinates currently owned leases)
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @param metricsFactory
* Metrics factory used to emit metrics
* @param taskBackoffTimeMillis
* Backoff period when tasks encounter an exception
* @param shardPrioritization
* Provides prioritization logic to decide which available shards process first
*/ */
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
IRecordProcessorFactory recordProcessorFactory, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
StreamConfig streamConfig, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
InitialPositionInStreamExtended initialPositionInStream, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
long parentShardPollIntervalMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
long shardSyncIdleTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
boolean cleanupLeasesUponShardCompletion,
ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator,
ExecutorService execService,
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ShardPrioritization shardPrioritization) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
@ -322,16 +344,11 @@ public class Worker implements Runnable {
this.executorService = execService; this.executorService = execService;
this.leaseCoordinator = leaseCoordinator; this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.controlServer = this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
new ShardSyncTaskManager(streamConfig.getStreamProxy(), initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
leaseCoordinator.getLeaseManager(), executorService);
initialPositionInStream,
cleanupLeasesUponShardCompletion,
shardSyncIdleTimeMillis,
metricsFactory,
executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis; this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.shardPrioritization = shardPrioritization; this.shardPrioritization = shardPrioritization;
} }
@ -344,8 +361,7 @@ public class Worker implements Runnable {
} }
/** /**
* Start consuming data from the stream, and pass it to the application * Start consuming data from the stream, and pass it to the application record processors.
* record processors.
*/ */
public void run() { public void run() {
if (shutdown) { if (shutdown) {
@ -418,12 +434,8 @@ public class Worker implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist if (!skipShardSyncAtWorkerInitializationIfLeasesExist
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
LOG.info("Syncing Kinesis shard info"); LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
new ShardSyncTask(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L);
leaseCoordinator.getLeaseManager(),
initialPosition,
cleanupLeasesUponShardCompletion,
0L);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else { } else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@ -460,14 +472,13 @@ public class Worker implements Runnable {
} }
/** /**
* NOTE: This method is internal/private to the Worker class. It has package * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* access solely for testing.
* *
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
* ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); * shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); ShardInfo
* ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); * shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
*/ */
void cleanupShardConsumers(Set<ShardInfo> assignedShards) { void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) { for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) {
@ -507,8 +518,57 @@ public class Worker implements Runnable {
} }
/** /**
* Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware}, * Starts the requestedShutdown process, and returns a future that can be used to track the process.
* of the impending shutdown. This gives the record processor a final chance to checkpoint. *
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
* indicates the process behavior
*
* @return a future that will be set once shutdown is completed.
*/
@Deprecated
public Future<Void> requestShutdown() {
Future<Boolean> requestedShutdownFuture = startGracefulShutdown();
return new Future<Void>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return requestedShutdownFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return requestedShutdownFuture.isCancelled();
}
@Override
public boolean isDone() {
return requestedShutdownFuture.isDone();
}
@Override
public Void get() throws InterruptedException, ExecutionException {
requestedShutdownFuture.get();
return null;
}
@Override
public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
requestedShutdownFuture.get(timeout, unit);
return null;
}
};
}
/**
* Requests a graceful shutdown of the worker, notifying record processors, that implement
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
* checkpoint.
*
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
* previous future.
* *
* <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is * <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is
* lost after requesting shutdown, but before the notification is dispatched.</b> * lost after requesting shutdown, but before the notification is dispatched.</b>
@ -532,48 +592,85 @@ public class Worker implements Runnable {
* <li>Once the worker shutdown is complete, the returned future is completed.</li> * <li>Once the worker shutdown is complete, the returned future is completed.</li>
* </ol> * </ol>
* *
* * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
* * completed successfully. A false value indicates that a non-exception case caused the shutdown process to
* @return a Future that will be set once the shutdown is complete. * terminate early.
*/ */
public Future<Void> requestShutdown() { public Future<Boolean> startGracefulShutdown() {
synchronized (this) {
// if (gracefulShutdownFuture == null) {
// Stop accepting new leases. Once we do this we can be sure that gracefulShutdownFuture = gracefulShutdownCoordinator
// no more leases will be acquired. .startGracefulShutdown(createGracefulShutdownCallable());
//
leaseCoordinator.stopLeaseTaker();
Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) {
//
// If there are no leases notification is already completed, but we still need to shutdown the worker.
//
this.shutdown();
return Futures.immediateFuture(null);
}
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
for (KinesisClientLease lease : leases) {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) {
consumer.notifyShutdownRequested(shutdownNotification);
} else {
//
// There is a race condition between retrieving the current assignments, and creating the
// notification. If the a lease is lost in between these two points, we explicitly decrement the
// notification latches to clear the shutdown.
//
notificationCompleteLatch.countDown();
shutdownCompleteLatch.countDown();
} }
} }
return gracefulShutdownFuture;
}
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this); /**
* Creates a callable that will execute the graceful shutdown process. This callable can be used to execute graceful
* shutdowns in your own executor, or execute the shutdown synchronously.
*
* @return a callable that run the graceful shutdown process. This may return a callable that return true if the
* graceful shutdown has already been completed.
* @throws IllegalStateException
* thrown by the callable if another callable has already started the shutdown process.
*/
public Callable<Boolean> createGracefulShutdownCallable() {
if (isShutdownComplete()) {
return () -> true;
}
Callable<GracefulShutdownContext> startShutdown = createWorkerShutdownCallable();
return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown);
}
public boolean hasGracefulShutdownStarted() {
return gracefuleShutdownStarted;
}
@VisibleForTesting
Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
return () -> {
synchronized (this) {
if (this.gracefuleShutdownStarted) {
throw new IllegalStateException("Requested shutdown has already been started");
}
this.gracefuleShutdownStarted = true;
}
//
// Stop accepting new leases. Once we do this we can be sure that
// no more leases will be acquired.
//
leaseCoordinator.stopLeaseTaker();
Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) {
//
// If there are no leases notification is already completed, but we still need to shutdown the worker.
//
this.shutdown();
return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED;
}
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
for (KinesisClientLease lease : leases) {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator,
lease, notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) {
consumer.notifyShutdownRequested(shutdownNotification);
} else {
//
// There is a race condition between retrieving the current assignments, and creating the
// notification. If the a lease is lost in between these two points, we explicitly decrement the
// notification latches to clear the shutdown.
//
notificationCompleteLatch.countDown();
shutdownCompleteLatch.countDown();
}
}
return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this);
};
} }
boolean isShutdownComplete() { boolean isShutdownComplete() {
@ -617,8 +714,8 @@ public class Worker implements Runnable {
} }
/** /**
* Perform final shutdown related tasks for the worker including shutting down worker owned * Perform final shutdown related tasks for the worker including shutting down worker owned executor services,
* executor services, threads, etc. * threads, etc.
*/ */
private void finalShutdown() { private void finalShutdown() {
LOG.info("Starting worker's final shutdown."); LOG.info("Starting worker's final shutdown.");
@ -659,11 +756,12 @@ public class Worker implements Runnable {
} }
/** /**
* NOTE: This method is internal/private to the Worker class. It has package * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* access solely for testing.
* *
* @param shardInfo Kinesis shard info * @param shardInfo
* @param factory RecordProcessor factory * Kinesis shard info
* @param factory
* RecordProcessor factory
* @return ShardConsumer for the shard * @return ShardConsumer for the shard
*/ */
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
@ -687,15 +785,15 @@ public class Worker implements Runnable {
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); executorService, metricsFactory, taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist);
} }
/** /**
* Logger for suppressing too much INFO logging. To avoid too much logging * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* information Worker will output logging at INFO level for a single pass * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
* through the main loop every minute. At DEBUG level it will output all * every pass.
* INFO logs on every pass.
*/ */
private static class WorkerLog { private static class WorkerLog {
@ -751,90 +849,89 @@ public class Worker implements Runnable {
// Backwards compatible constructors // Backwards compatible constructors
/** /**
* This constructor is for binary compatibility with code compiled against * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* version of the KCL that only have constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
AmazonDynamoDBClient dynamoDBClient, this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
AmazonCloudWatchClient cloudWatchClient) {
this(recordProcessorFactory,
config,
(AmazonKinesis) kinesisClient,
(AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient); (AmazonCloudWatch) cloudWatchClient);
} }
/** /**
* This constructor is for binary compatibility with code compiled against * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* version of the KCL that only have constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
AmazonDynamoDBClient dynamoDBClient, this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
AmazonCloudWatchClient cloudWatchClient, (AmazonCloudWatch) cloudWatchClient, execService);
ExecutorService execService) {
this(recordProcessorFactory,
config,
(AmazonKinesis) kinesisClient,
(AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient,
execService);
} }
/** /**
* This constructor is for binary compatibility with code compiled against * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* version of the KCL that only have constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param metricsFactory Metrics factory used to emit metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
AmazonDynamoDBClient dynamoDBClient, this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
IMetricsFactory metricsFactory, metricsFactory, execService);
ExecutorService execService) {
this(recordProcessorFactory,
config,
(AmazonKinesis) kinesisClient,
(AmazonDynamoDB) dynamoDBClient,
metricsFactory,
execService);
} }
/** /**
* Given configuration, returns appropriate metrics factory. * Given configuration, returns appropriate metrics factory.
* @param cloudWatchClient Amazon CloudWatch client *
* @param config KinesisClientLibConfiguration * @param cloudWatchClient
* Amazon CloudWatch client
* @param config
* KinesisClientLibConfiguration
* @return Returns metrics factory based on the config. * @return Returns metrics factory based on the config.
*/ */
private static IMetricsFactory getMetricsFactory( private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient,
AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config) {
IMetricsFactory metricsFactory; IMetricsFactory metricsFactory;
if (config.getMetricsLevel() == MetricsLevel.NONE) { if (config.getMetricsLevel() == MetricsLevel.NONE) {
metricsFactory = new NullMetricsFactory(); metricsFactory = new NullMetricsFactory();
@ -844,12 +941,8 @@ public class Worker implements Runnable {
cloudWatchClient.setRegion(region); cloudWatchClient.setRegion(region);
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
} }
metricsFactory = new WorkerCWMetricsFactory( metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
cloudWatchClient, config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
config.getApplicationName(),
config.getMetricsBufferTimeMillis(),
config.getMetricsMaxQueueSize(),
config.getMetricsLevel(),
config.getMetricsEnabledDimensions()); config.getMetricsEnabledDimensions());
} }
return metricsFactory; return metricsFactory;
@ -857,6 +950,7 @@ public class Worker implements Runnable {
/** /**
* Returns default executor service that should be used by the worker. * Returns default executor service that should be used by the worker.
*
* @return Default executor service that should be used by the worker. * @return Default executor service that should be used by the worker.
*/ */
private static ExecutorService getExecutorService() { private static ExecutorService getExecutorService() {
@ -865,26 +959,19 @@ public class Worker implements Runnable {
} }
/** /**
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
* or not.
* Visible and non-final only for testing. * Visible and non-final only for testing.
*/ */
static class WorkerCWMetricsFactory extends CWMetricsFactory { static class WorkerCWMetricsFactory extends CWMetricsFactory {
WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis,
String namespace, int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
long bufferTimeMillis, super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions);
int maxQueueSize,
MetricsLevel metricsLevel,
Set<String> metricsEnabledDimensions) {
super(cloudWatchClient, namespace, bufferTimeMillis,
maxQueueSize, metricsLevel, metricsEnabledDimensions);
} }
} }
/** /**
* Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance or not.
* or not.
* Visible and non-final only for testing. * Visible and non-final only for testing.
*/ */
static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
@ -918,24 +1005,25 @@ public class Worker implements Runnable {
} }
/** /**
* Provide a V1 * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}. * IRecordProcessor}.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder recordProcessorFactory( public Builder recordProcessorFactory(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory) {
recordProcessorFactory) {
this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory); this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory);
return this; return this;
} }
/** /**
* Provide a V2 * Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}. * IRecordProcessor}.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) { public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) {
@ -946,7 +1034,8 @@ public class Worker implements Runnable {
/** /**
* Set the Worker config. * Set the Worker config.
* *
* @param config Kinesis Client Library configuration * @param config
* Kinesis Client Library configuration
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder config(KinesisClientLibConfiguration config) { public Builder config(KinesisClientLibConfiguration config) {
@ -957,7 +1046,8 @@ public class Worker implements Runnable {
/** /**
* Set the Kinesis client. * Set the Kinesis client.
* *
* @param kinesisClient Kinesis Client used for fetching data * @param kinesisClient
* Kinesis Client used for fetching data
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder kinesisClient(AmazonKinesis kinesisClient) { public Builder kinesisClient(AmazonKinesis kinesisClient) {
@ -968,7 +1058,8 @@ public class Worker implements Runnable {
/** /**
* Set the DynamoDB client. * Set the DynamoDB client.
* *
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) {
@ -979,7 +1070,8 @@ public class Worker implements Runnable {
/** /**
* Set the Cloudwatch client. * Set the Cloudwatch client.
* *
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) { public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) {
@ -990,7 +1082,8 @@ public class Worker implements Runnable {
/** /**
* Set the metrics factory. * Set the metrics factory.
* *
* @param metricsFactory Metrics factory used to emit metrics * @param metricsFactory
* Metrics factory used to emit metrics
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder metricsFactory(IMetricsFactory metricsFactory) { public Builder metricsFactory(IMetricsFactory metricsFactory) {
@ -1001,7 +1094,8 @@ public class Worker implements Runnable {
/** /**
* Set the executor service for processing records. * Set the executor service for processing records.
* *
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder execService(ExecutorService execService) { public Builder execService(ExecutorService execService) {
@ -1035,8 +1129,7 @@ public class Worker implements Runnable {
"Kinesis Client Library configuration needs to be provided to build Worker"); "Kinesis Client Library configuration needs to be provided to build Worker");
} }
if (recordProcessorFactory == null) { if (recordProcessorFactory == null) {
throw new IllegalArgumentException( throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker");
"A Record Processor Factory needs to be provided to build Worker");
} }
if (execService == null) { if (execService == null) {
@ -1078,7 +1171,7 @@ public class Worker implements Runnable {
+ ". Amazon Kinesis endpoint will overwrite region name."); + ". Amazon Kinesis endpoint will overwrite region name.");
LOG.debug("The region of Amazon Kinesis client has been overwritten to " LOG.debug("The region of Amazon Kinesis client has been overwritten to "
+ config.getKinesisEndpoint()); + config.getKinesisEndpoint());
} else { } else {
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
} }
} }
@ -1089,6 +1182,7 @@ public class Worker implements Runnable {
shardPrioritization = new ParentsFirstShardPrioritization(1); shardPrioritization = new ParentsFirstShardPrioritization(1);
} }
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),
recordProcessorFactory, recordProcessorFactory,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
@ -1110,6 +1204,7 @@ public class Worker implements Runnable {
config.getEpsilonMillis(), config.getEpsilonMillis(),
config.getMaxLeasesForWorker(), config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory) metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),

View file

@ -1,3 +1,18 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
// Generated by the protocol buffer compiler. DO NOT EDIT! // Generated by the protocol buffer compiler. DO NOT EDIT!
// source: messages.proto // source: messages.proto

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.utils; package com.amazonaws.services.kinesis.clientlibrary.utils;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;

View file

@ -19,15 +19,17 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.utils.NamedThreadFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
@ -40,6 +42,7 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
@ -63,12 +66,10 @@ public class LeaseCoordinator<T extends Lease> {
private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int DEFAULT_MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new NamedThreadFactory("LeaseCoordinator-"); private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder()
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new NamedThreadFactory("LeaseRenewer-"); .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
// Package level access for testing. .setNameFormat("LeaseRenewer-%04d").setDaemon(true).build();
static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20;
private final ILeaseRenewer<T> leaseRenewer; private final ILeaseRenewer<T> leaseRenewer;
private final ILeaseTaker<T> leaseTaker; private final ILeaseTaker<T> leaseTaker;
@ -114,7 +115,8 @@ public class LeaseCoordinator<T extends Lease> {
long epsilonMillis, long epsilonMillis,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis,
DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME, metricsFactory); DEFAULT_MAX_LEASES_FOR_WORKER, DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME,
KinesisClientLibConfiguration.DEFAULT_MAX_LEASE_RENEWAL_THREADS, metricsFactory);
} }
/** /**
@ -134,8 +136,9 @@ public class LeaseCoordinator<T extends Lease> {
long epsilonMillis, long epsilonMillis,
int maxLeasesForWorker, int maxLeasesForWorker,
int maxLeasesToStealAtOneTime, int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) { IMetricsFactory metricsFactory) {
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(MAX_LEASE_RENEWAL_THREAD_COUNT); this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis) this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
@ -366,6 +369,9 @@ public class LeaseCoordinator<T extends Lease> {
* @return Executor service that should be used for lease renewal. * @return Executor service that should be used for lease renewal.
*/ */
private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
return Executors.newFixedThreadPool(maximumPoolSize, LEASE_RENEWAL_THREAD_FACTORY); int coreLeaseCount = Math.max(maximumPoolSize / 4, 2);
return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS,
new LinkedTransferQueue<Runnable>(), LEASE_RENEWAL_THREAD_FACTORY);
} }
} }

View file

@ -100,7 +100,7 @@ public class MultiLangDaemonConfig {
kinesisClientLibConfig = configurator.getConfiguration(properties); kinesisClientLibConfig = configurator.getConfiguration(properties);
executorService = buildExecutorService(properties); executorService = buildExecutorService(properties);
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService); recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig);
LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream " LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
+ kinesisClientLibConfig.getStreamName() + " with executable " + executableName); + kinesisClientLibConfig.getStreamName() + " with executable " + executableName);

View file

@ -14,11 +14,9 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
@ -29,9 +27,14 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import lombok.extern.apachecommons.CommonsLog; import lombok.extern.apachecommons.CommonsLog;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* An implementation of the multi language protocol. * An implementation of the multi language protocol.
*/ */
@ -41,10 +44,11 @@ class MultiLangProtocol {
private MessageReader messageReader; private MessageReader messageReader;
private MessageWriter messageWriter; private MessageWriter messageWriter;
private final InitializationInput initializationInput; private final InitializationInput initializationInput;
private KinesisClientLibConfiguration configuration;
/** /**
* Constructor. * Constructor.
* *
* @param messageReader * @param messageReader
* A message reader. * A message reader.
* @param messageWriter * @param messageWriter
@ -53,16 +57,17 @@ class MultiLangProtocol {
* information about the shard this processor is starting to process * information about the shard this processor is starting to process
*/ */
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
InitializationInput initializationInput) { InitializationInput initializationInput, KinesisClientLibConfiguration configuration) {
this.messageReader = messageReader; this.messageReader = messageReader;
this.messageWriter = messageWriter; this.messageWriter = messageWriter;
this.initializationInput = initializationInput; this.initializationInput = initializationInput;
this.configuration = configuration;
} }
/** /**
* Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with * Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with
* a {@link StatusMessage} on its STDOUT. * a {@link StatusMessage} on its STDOUT.
* *
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
*/ */
boolean initialize() { boolean initialize() {
@ -77,7 +82,7 @@ class MultiLangProtocol {
/** /**
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond * Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
* with a {@link StatusMessage} on its STDOUT. * with a {@link StatusMessage} on its STDOUT.
* *
* @param processRecordsInput * @param processRecordsInput
* The records, and associated metadata, to process. * The records, and associated metadata, to process.
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
@ -90,7 +95,7 @@ class MultiLangProtocol {
/** /**
* Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a * Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a
* {@link StatusMessage} on its STDOUT. * {@link StatusMessage} on its STDOUT.
* *
* @param checkpointer A checkpointer. * @param checkpointer A checkpointer.
* @param reason Why this processor is being shutdown. * @param reason Why this processor is being shutdown.
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
@ -119,7 +124,7 @@ class MultiLangProtocol {
* all communications with the child process regarding checkpointing were successful. Note that whether or not the * all communications with the child process regarding checkpointing were successful. Note that whether or not the
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was * checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
* able to successfully communicate the results of its attempts to checkpoint. * able to successfully communicate the results of its attempts to checkpoint.
* *
* @param action * @param action
* What action is being waited on. * What action is being waited on.
* @param checkpointer * @param checkpointer
@ -150,44 +155,75 @@ class MultiLangProtocol {
/** /**
* Waits for status message and verifies it against the expectation * Waits for status message and verifies it against the expectation
* *
* @param action * @param action
* What action is being waited on. * What action is being waited on.
* @param checkpointer * @param checkpointer
* the original process records request * the original process records request
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
*/ */
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
StatusMessage statusMessage = null; Optional<StatusMessage> statusMessage = Optional.empty();
while (statusMessage == null) { while (!statusMessage.isPresent()) {
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT(); Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
try { Optional<Message> message = configuration.getTimeoutInSeconds()
Message message = future.get(); .map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action))
// Note that instanceof doubles as a check against a value being null .orElse(futureMethod(future::get, action));
if (message instanceof CheckpointMessage) {
boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get(); if (!message.isPresent()) {
if (!checkpointWriteSucceeded) {
return false;
}
} else if (message instanceof StatusMessage) {
statusMessage = (StatusMessage) message;
}
} catch (InterruptedException e) {
log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
initializationInput.getShardId()));
return false;
} catch (ExecutionException e) {
log.error(String.format("Failed to get status message for %s action for shard %s", action,
initializationInput.getShardId()), e);
return false; return false;
} }
Optional<Boolean> checkpointFailed = message.filter(m -> m instanceof CheckpointMessage )
.map(m -> (CheckpointMessage) m)
.flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint"))
.map(checkpointSuccess -> !checkpointSuccess);
if (checkpointFailed.orElse(false)) {
return false;
}
statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m );
} }
return this.validateStatusMessage(statusMessage, action); return this.validateStatusMessage(statusMessage.get(), action);
}
private interface FutureMethod<T> {
T get() throws InterruptedException, TimeoutException, ExecutionException;
}
private <T> Optional<T> futureMethod(FutureMethod<T> fm, String action) {
try {
return Optional.of(fm.get());
} catch (InterruptedException e) {
log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
initializationInput.getShardId()), e);
} catch (ExecutionException e) {
log.error(String.format("Failed to get status message for %s action for shard %s", action,
initializationInput.getShardId()), e);
} catch (TimeoutException e) {
log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...",
action,
initializationInput.getShardId()),
e);
haltJvm(1);
}
return Optional.empty();
}
/**
* This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM
* without calling the Shutdown hooks.
*
* @param exitStatus The exit status with which the JVM is to be halted.
*/
protected void haltJvm(int exitStatus) {
Runtime.getRuntime().halt(exitStatus);
} }
/** /**
* Utility for confirming that the status message is for the provided action. * Utility for confirming that the status message is for the provided action.
* *
* @param statusMessage The status of the child process. * @param statusMessage The status of the child process.
* @param action The action that was being waited on. * @param action The action that was being waited on.
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
@ -205,7 +241,7 @@ class MultiLangProtocol {
* provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then * provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then
* this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing * this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing
* the attempt to write the result of this checkpoint attempt to the child process. * the attempt to write the result of this checkpoint attempt to the child process.
* *
* @param checkpointMessage A checkpoint message. * @param checkpointMessage A checkpoint message.
* @param checkpointer A checkpointer. * @param checkpointer A checkpointer.
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.

View file

@ -24,6 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateExcep
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -65,6 +66,8 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
private MultiLangProtocol protocol; private MultiLangProtocol protocol;
private KinesisClientLibConfiguration configuration;
@Override @Override
public void initialize(InitializationInput initializationInput) { public void initialize(InitializationInput initializationInput) {
try { try {
@ -87,7 +90,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
// Submit the error reader for execution // Submit the error reader for execution
stderrReadTask = executorService.submit(readSTDERRTask); stderrReadTask = executorService.submit(readSTDERRTask);
protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput); protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput, configuration);
if (!protocol.initialize()) { if (!protocol.initialize()) {
throw new RuntimeException("Failed to initialize child process"); throw new RuntimeException("Failed to initialize child process");
} }
@ -173,9 +176,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
* An obejct mapper. * An obejct mapper.
*/ */
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
ObjectMapper objectMapper) { ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask()); new DrainChildSTDERRTask(), configuration);
} }
/** /**
@ -195,13 +198,16 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
* Error reader to read from child process's stderr * Error reader to read from child process's stderr
*/ */
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) { MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask,
KinesisClientLibConfiguration configuration) {
this.executorService = executorService; this.executorService = executorService;
this.processBuilder = processBuilder; this.processBuilder = processBuilder;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.messageWriter = messageWriter; this.messageWriter = messageWriter;
this.messageReader = messageReader; this.messageReader = messageReader;
this.readSTDERRTask = readSTDERRTask; this.readSTDERRTask = readSTDERRTask;
this.configuration = configuration;
this.state = ProcessState.ACTIVE; this.state = ProcessState.ACTIVE;
} }

View file

@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.multilang;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -39,12 +40,15 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
private final ExecutorService executorService; private final ExecutorService executorService;
private final KinesisClientLibConfiguration configuration;
/** /**
* @param command The command that will do processing for this factory's record processors. * @param command The command that will do processing for this factory's record processors.
* @param executorService An executor service to use while processing inputs and outputs of the child process. * @param executorService An executor service to use while processing inputs and outputs of the child process.
*/ */
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) { public MultiLangRecordProcessorFactory(String command, ExecutorService executorService,
this(command, executorService, new ObjectMapper()); KinesisClientLibConfiguration configuration) {
this(command, executorService, new ObjectMapper(), configuration);
} }
/** /**
@ -52,11 +56,13 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
* @param executorService An executor service to use while processing inputs and outputs of the child process. * @param executorService An executor service to use while processing inputs and outputs of the child process.
* @param objectMapper An object mapper used to convert messages to json to be written to the child process * @param objectMapper An object mapper used to convert messages to json to be written to the child process
*/ */
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) { public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper,
KinesisClientLibConfiguration configuration) {
this.command = command; this.command = command;
this.commandArray = command.split(COMMAND_DELIMETER_REGEX); this.commandArray = command.split(COMMAND_DELIMETER_REGEX);
this.executorService = executorService; this.executorService = executorService;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.configuration = configuration;
} }
@Override @Override
@ -65,7 +71,8 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
/* /*
* Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments. * Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments.
*/ */
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper); return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper,
this.configuration);
} }
String[] getCommandArray() { String[] getCommandArray() {

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;

View file

@ -0,0 +1,322 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;
@RunWith(MockitoJUnitRunner.class)
public class GracefulShutdownCoordinatorTest {
@Mock
private CountDownLatch shutdownCompleteLatch;
@Mock
private CountDownLatch notificationCompleteLatch;
@Mock
private Worker worker;
@Mock
private Callable<GracefulShutdownContext> contextCallable;
@Mock
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
@Test
public void testAllShutdownCompletedAlready() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
assertThat(requestedShutdownCallable.call(), equalTo(true));
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(worker).shutdown();
}
@Test
public void testNotificationNotCompletedYet() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
mockLatchAwait(notificationCompleteLatch, false, true);
when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L);
mockLatchAwait(shutdownCompleteLatch, true);
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L);
when(worker.isShutdownComplete()).thenReturn(false, true);
mockShardInfoConsumerMap(1, 0);
assertThat(requestedShutdownCallable.call(), equalTo(true));
verify(notificationCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class));
verify(notificationCompleteLatch).getCount();
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(shutdownCompleteLatch, times(2)).getCount();
verify(worker).shutdown();
}
@Test
public void testShutdownNotCompletedYet() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
mockLatchAwait(notificationCompleteLatch, true);
mockLatchAwait(shutdownCompleteLatch, false, true);
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 0L);
when(worker.isShutdownComplete()).thenReturn(false, true);
mockShardInfoConsumerMap(1, 0);
assertThat(requestedShutdownCallable.call(), equalTo(true));
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(notificationCompleteLatch, never()).getCount();
verify(shutdownCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class));
verify(shutdownCompleteLatch, times(2)).getCount();
verify(worker).shutdown();
}
@Test
public void testMultipleAttemptsForNotification() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
mockLatchAwait(notificationCompleteLatch, false, false, true);
when(notificationCompleteLatch.getCount()).thenReturn(2L, 1L, 0L);
mockLatchAwait(shutdownCompleteLatch, true);
when(shutdownCompleteLatch.getCount()).thenReturn(2L, 2L, 1L, 1L, 0L);
when(worker.isShutdownComplete()).thenReturn(false, false, false, true);
mockShardInfoConsumerMap(2, 1, 0);
assertThat(requestedShutdownCallable.call(), equalTo(true));
verifyLatchAwait(notificationCompleteLatch, 3);
verify(notificationCompleteLatch, times(2)).getCount();
verifyLatchAwait(shutdownCompleteLatch, 1);
verify(shutdownCompleteLatch, times(4)).getCount();
}
@Test
public void testWorkerAlreadyShutdownAtNotification() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
mockLatchAwait(notificationCompleteLatch, false, true);
when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L);
mockLatchAwait(shutdownCompleteLatch, true);
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L);
when(worker.isShutdownComplete()).thenReturn(true);
mockShardInfoConsumerMap(0);
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verify(notificationCompleteLatch).getCount();
verifyLatchAwait(shutdownCompleteLatch, never());
verify(shutdownCompleteLatch, times(3)).getCount();
}
@Test
public void testWorkerAlreadyShutdownAtComplete() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
mockLatchAwait(notificationCompleteLatch, true);
mockLatchAwait(shutdownCompleteLatch, false, true);
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 1L);
when(worker.isShutdownComplete()).thenReturn(true);
mockShardInfoConsumerMap(0);
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verify(notificationCompleteLatch, never()).getCount();
verifyLatchAwait(shutdownCompleteLatch);
verify(shutdownCompleteLatch, times(3)).getCount();
}
@Test
public void testNotificationInterrupted() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException());
when(notificationCompleteLatch.getCount()).thenReturn(1L);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch, never());
verify(worker, never()).shutdown();
}
@Test
public void testShutdownInterrupted() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException());
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch);
verify(worker).shutdown();
}
@Test
public void testInterruptedAfterNotification() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> {
Thread.currentThread().interrupt();
return true;
});
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch, never());
verify(worker, never()).shutdown();
}
@Test
public void testInterruptedAfterWorkerShutdown() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
doAnswer(invocation -> {
Thread.currentThread().interrupt();
return true;
}).when(worker).shutdown();
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch, never());
verify(worker).shutdown();
}
@Test
public void testInterruptedDuringNotification() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> {
Thread.currentThread().interrupt();
return false;
});
when(notificationCompleteLatch.getCount()).thenReturn(1L);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verify(notificationCompleteLatch).getCount();
verifyLatchAwait(shutdownCompleteLatch, never());
verify(shutdownCompleteLatch).getCount();
verify(worker, never()).shutdown();
}
@Test
public void testInterruptedDuringShutdown() throws Exception {
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> {
Thread.currentThread().interrupt();
return false;
});
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
assertThat(requestedShutdownCallable.call(), equalTo(false));
verifyLatchAwait(notificationCompleteLatch);
verify(notificationCompleteLatch, never()).getCount();
verifyLatchAwait(shutdownCompleteLatch);
verify(shutdownCompleteLatch).getCount();
verify(worker).shutdown();
}
@Test(expected = IllegalStateException.class)
public void testWorkerShutdownCallableThrows() throws Exception {
Callable<Boolean> requestedShutdownCallable = new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable);
when(contextCallable.call()).thenThrow(new IllegalStateException("Bad Shutdown"));
requestedShutdownCallable.call();
}
private void verifyLatchAwait(CountDownLatch latch) throws Exception {
verifyLatchAwait(latch, times(1));
}
private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception {
verifyLatchAwait(latch, times(times));
}
private void verifyLatchAwait(CountDownLatch latch, VerificationMode verificationMode) throws Exception {
verify(latch, verificationMode).await(anyLong(), any(TimeUnit.class));
}
private void mockLatchAwait(CountDownLatch latch, Boolean initial, Boolean... remaining) throws Exception {
when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, remaining);
}
private Callable<Boolean> buildRequestedShutdownCallable() throws Exception {
GracefulShutdownContext context = new GracefulShutdownContext(shutdownCompleteLatch,
notificationCompleteLatch, worker);
when(contextCallable.call()).thenReturn(context);
return new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable);
}
private void mockShardInfoConsumerMap(Integer initialItemCount, Integer... additionalItemCounts) {
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length];
for (int i = 0; i < additionalItemCounts.length; ++i) {
additionalEmptyStates[i] = additionalItemCounts[i] == 0;
}
when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts);
when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates);
}
}

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;

View file

@ -1,236 +0,0 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;
@RunWith(MockitoJUnitRunner.class)
public class ShutdownFutureTest {
@Mock
private CountDownLatch shutdownCompleteLatch;
@Mock
private CountDownLatch notificationCompleteLatch;
@Mock
private Worker worker;
@Mock
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
@Test
public void testSimpleGetAlreadyCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(true);
mockShutdownComplete(true);
future.get();
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(worker).shutdown();
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
}
@Test
public void testNotificationNotCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(false, true);
mockShutdownComplete(true);
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
when(shardInfoConsumerMap.isEmpty()).thenReturn(false);
when(worker.isShutdownComplete()).thenReturn(false);
when(notificationCompleteLatch.getCount()).thenReturn(1L);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
expectedTimeoutException(future);
verify(worker, never()).shutdown();
awaitFuture(future);
verify(notificationCompleteLatch).getCount();
verifyLatchAwait(notificationCompleteLatch, 2);
verify(shutdownCompleteLatch).getCount();
verifyLatchAwait(shutdownCompleteLatch);
verify(worker).shutdown();
}
@Test
public void testShutdownNotCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(true);
mockShutdownComplete(false, true);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(1);
expectedTimeoutException(future);
verify(worker).shutdown();
awaitFuture(future);
verifyLatchAwait(notificationCompleteLatch, 2);
verifyLatchAwait(shutdownCompleteLatch, 2);
verify(worker).isShutdownComplete();
verify(worker).getShardInfoShardConsumerMap();
}
@Test
public void testShutdownNotCompleteButWorkerShutdown() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(true);
mockShutdownComplete(false);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
when(worker.isShutdownComplete()).thenReturn(true);
mockShardInfoConsumerMap(1);
awaitFuture(future);
verify(worker).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch);
verify(worker, times(2)).isShutdownComplete();
verify(worker).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).size();
}
@Test
public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(true);
mockShutdownComplete(false);
mockOutstanding(shutdownCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(0);
awaitFuture(future);
verify(worker).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch);
verify(worker, times(2)).isShutdownComplete();
verify(worker, times(2)).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).isEmpty();
verify(shardInfoConsumerMap).size();
}
@Test
public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(false);
mockShutdownComplete(false);
mockOutstanding(notificationCompleteLatch, 1L);
mockOutstanding(shutdownCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(0);
awaitFuture(future);
verify(worker, never()).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verify(shutdownCompleteLatch, never()).await();
verify(worker, times(2)).isShutdownComplete();
verify(worker, times(2)).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).isEmpty();
verify(shardInfoConsumerMap).size();
}
@Test(expected = TimeoutException.class)
public void testTimeExceededException() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(false);
mockOutstanding(notificationCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(1);
future.get(1, TimeUnit.NANOSECONDS);
}
private ShutdownFuture create() {
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
}
private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) {
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length];
for(int i = 0; i < additionalItemCounts.length; ++i) {
additionalEmptyStates[i] = additionalItemCounts[i] == 0;
}
when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts);
when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates);
}
private void verifyLatchAwait(CountDownLatch latch) throws Exception {
verifyLatchAwait(latch, 1);
}
private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception {
verify(latch, times(times)).await(anyLong(), any(TimeUnit.class));
}
private void expectedTimeoutException(ShutdownFuture future) throws Exception {
boolean gotTimeout = false;
try {
awaitFuture(future);
} catch (TimeoutException te) {
gotTimeout = true;
}
assertThat("Expected a timeout exception to occur", gotTimeout);
}
private void awaitFuture(ShutdownFuture future) throws Exception {
future.get(1, TimeUnit.SECONDS);
}
private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception {
mockLatch(notificationCompleteLatch, initial, states);
}
private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception {
mockLatch(shutdownCompleteLatch, initial, states);
}
private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception {
when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states);
}
private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception {
when(latch.getCount()).thenReturn(remaining, additionalRemaining);
}
}

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;

View file

@ -1,16 +1,16 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
* A copy of the License is located at * A copy of the License is located at
* *
* http://aws.amazon.com/asl/ * http://aws.amazon.com/asl/
* *
* or in the "license" file accompanying this file. This file is distributed * or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
@ -762,7 +762,7 @@ public class WorkerTest {
verify(executorService, atLeastOnce()).submit(argThat( verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown(); worker.createWorkerShutdownCallable().call();
worker.runProcessLoop(); worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
@ -781,6 +781,146 @@ public class WorkerTest {
} }
@Test(expected = IllegalStateException.class)
public void testShutdownCallableNotAllowedTwice() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@Override
void postConstruct() {
this.gracefuleShutdownStarted = true;
}
};
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
assertThat(worker.hasGracefulShutdownStarted(), equalTo(true));
worker.createWorkerShutdownCallable().call();
}
@Test
public void testGracefulShutdownSingleFuture() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
GracefulShutdownCoordinator coordinator = mock(GracefulShutdownCoordinator.class);
when(coordinator.createGracefulShutdownCallable(any(Callable.class))).thenReturn(() -> true);
Future<Boolean> gracefulShutdownFuture = mock(Future.class);
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@Override
void postConstruct() {
this.gracefulShutdownCoordinator = coordinator;
}
};
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
Future<Boolean> firstFuture = worker.startGracefulShutdown();
Future<Boolean> secondFuture = worker.startGracefulShutdown();
assertThat(firstFuture, equalTo(secondFuture));
verify(coordinator).startGracefulShutdown(any(Callable.class));
}
@Test @Test
public void testRequestShutdownNoLeases() throws Exception { public void testRequestShutdownNoLeases() throws Exception {
@ -830,7 +970,7 @@ public class WorkerTest {
verify(executorService, never()).submit(argThat( verify(executorService, never()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown(); worker.createWorkerShutdownCallable().call();
worker.runProcessLoop(); worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
@ -909,7 +1049,7 @@ public class WorkerTest {
.withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2))))); .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.getShardInfoShardConsumerMap().remove(shardInfo2); worker.getShardInfoShardConsumerMap().remove(shardInfo2);
worker.requestShutdown(); worker.createWorkerShutdownCallable().call();
leases.remove(1); leases.remove(1);
currentAssignments.remove(1); currentAssignments.remove(1);
worker.runProcessLoop(); worker.runProcessLoop();
@ -1194,6 +1334,24 @@ public class WorkerTest {
} }
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
postConstruct();
}
abstract void postConstruct();
}
private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) { private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) {
return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID()) return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID())
.withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L) .withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L)

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.proxies; package com.amazonaws.services.kinesis.clientlibrary.proxies;
import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.both;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.types; package com.amazonaws.services.kinesis.clientlibrary.types;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl; package com.amazonaws.services.kinesis.leases.impl;
import java.util.HashSet; import java.util.HashSet;

View file

@ -1,3 +1,17 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;

View file

@ -14,64 +14,78 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MultiLangProtocolTest { public class MultiLangProtocolTest {
private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList(); private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList();
@Mock
private MultiLangProtocol protocol; private MultiLangProtocol protocol;
@Mock
private MessageWriter messageWriter; private MessageWriter messageWriter;
@Mock
private MessageReader messageReader; private MessageReader messageReader;
private String shardId; private String shardId;
@Mock
private IRecordProcessorCheckpointer checkpointer; private IRecordProcessorCheckpointer checkpointer;
@Mock
private KinesisClientLibConfiguration configuration;
@Before @Before
public void setup() { public void setup() {
this.shardId = "shard-id-123"; this.shardId = "shard-id-123";
messageWriter = Mockito.mock(MessageWriter.class); protocol = new MultiLangProtocolForTesting(messageReader, messageWriter,
messageReader = Mockito.mock(MessageReader.class); new InitializationInput().withShardId(shardId), configuration);
protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId));
checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty());
} }
private <T> Future<T> buildFuture(T value) { private <T> Future<T> buildFuture(T value) {
@ -165,7 +179,10 @@ public class MultiLangProtocolTest {
this.add(new StatusMessage("processRecords")); this.add(new StatusMessage("processRecords"));
} }
})); }));
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true));
boolean result = protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer));
assertThat(result, equalTo(true));
verify(checkpointer, timeout(1)).checkpoint(); verify(checkpointer, timeout(1)).checkpoint();
verify(checkpointer, timeout(1)).checkpoint("123", 0L); verify(checkpointer, timeout(1)).checkpoint("123", 0L);
@ -183,4 +200,50 @@ public class MultiLangProtocolTest {
})); }));
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false)); assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false));
} }
@Test(expected = NullPointerException.class)
public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
Future<Message> future = Mockito.mock(Future.class);
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future);
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5));
when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class);
protocol = new MultiLangProtocolForTesting(messageReader,
messageWriter,
new InitializationInput().withShardId(shardId),
configuration);
protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST));
}
@Test
public void waitForStatusMessageSuccessTest() {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class));
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5));
assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)));
}
private class MultiLangProtocolForTesting extends MultiLangProtocol {
/**
* Constructor.
*
* @param messageReader A message reader.
* @param messageWriter A message writer.
* @param initializationInput
* @param configuration
*/
MultiLangProtocolForTesting(final MessageReader messageReader,
final MessageWriter messageWriter,
final InitializationInput initializationInput,
final KinesisClientLibConfiguration configuration) {
super(messageReader, messageWriter, initializationInput, configuration);
}
@Override
protected void haltJvm(final int exitStatus) {
throw new NullPointerException();
}
}
} }

View file

@ -14,16 +14,24 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class StreamingRecordProcessorFactoryTest { public class StreamingRecordProcessorFactoryTest {
@Mock
private KinesisClientLibConfiguration configuration;
@Test @Test
public void createProcessorTest() { public void createProcessorTest() {
MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null); MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null, configuration);
IRecordProcessor processor = factory.createProcessor(); IRecordProcessor processor = factory.createProcessor();
Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class, Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class,

View file

@ -14,42 +14,14 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
@ -60,6 +32,35 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class StreamingRecordProcessorTest { public class StreamingRecordProcessorTest {
@ -137,6 +138,9 @@ public class StreamingRecordProcessorTest {
private MultiLangRecordProcessor recordProcessor; private MultiLangRecordProcessor recordProcessor;
@Mock
private KinesisClientLibConfiguration configuration;
@Before @Before
public void prepare() throws IOException, InterruptedException, ExecutionException { public void prepare() throws IOException, InterruptedException, ExecutionException {
// Fake command // Fake command
@ -150,10 +154,11 @@ public class StreamingRecordProcessorTest {
messageWriter = Mockito.mock(MessageWriter.class); messageWriter = Mockito.mock(MessageWriter.class);
messageReader = Mockito.mock(MessageReader.class); messageReader = Mockito.mock(MessageReader.class);
errorReader = Mockito.mock(DrainChildSTDERRTask.class); errorReader = Mockito.mock(DrainChildSTDERRTask.class);
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty());
recordProcessor = recordProcessor =
new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter, new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter,
messageReader, errorReader) { messageReader, errorReader, configuration) {
// Just don't do anything when we exit. // Just don't do anything when we exit.
void exit() { void exit() {