Make Graceful Shutdown Run On Its Own Thread (#191)
* Initial start of fix for requested shutdown * Execute the requested shutdown in a separate thread. Fix for Issue #167 * Reworked some of the shutdown logic to make the relationships clearer. * Added/Updated Copyright Statements * Add Missing License Statements
This commit is contained in:
parent
f697a094d9
commit
c067cefa1f
26 changed files with 1321 additions and 700 deletions
|
|
@ -1,10 +0,0 @@
|
|||
source.. = src/main/java,\
|
||||
src/main/resources
|
||||
output.. = bin/
|
||||
|
||||
bin.includes = LICENSE.txt,\
|
||||
NOTICE.txt,\
|
||||
META-INF/,\
|
||||
.
|
||||
|
||||
jre.compilation.profile = JavaSE-1.7
|
||||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
@Data
|
||||
class GracefulShutdownContext {
|
||||
private final CountDownLatch shutdownCompleteLatch;
|
||||
private final CountDownLatch notificationCompleteLatch;
|
||||
private final Worker worker;
|
||||
|
||||
static GracefulShutdownContext SHUTDOWN_ALREADY_COMPLETED = new GracefulShutdownContext(null, null, null);
|
||||
|
||||
boolean isShutdownAlreadyCompleted() {
|
||||
return shutdownCompleteLatch == null && notificationCompleteLatch == null && worker == null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
class GracefulShutdownCoordinator {
|
||||
|
||||
Future<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) {
|
||||
FutureTask<Boolean> task = new FutureTask<>(shutdownCallable);
|
||||
Thread shutdownThread = new Thread(task, "RequestedShutdownThread");
|
||||
shutdownThread.start();
|
||||
return task;
|
||||
|
||||
}
|
||||
|
||||
Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
|
||||
return new GracefulShutdownCallable(startWorkerShutdown);
|
||||
}
|
||||
|
||||
static class GracefulShutdownCallable implements Callable<Boolean> {
|
||||
|
||||
private static final Log log = LogFactory.getLog(GracefulShutdownCallable.class);
|
||||
|
||||
private final Callable<GracefulShutdownContext> startWorkerShutdown;
|
||||
|
||||
GracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
|
||||
this.startWorkerShutdown = startWorkerShutdown;
|
||||
}
|
||||
|
||||
private boolean isWorkerShutdownComplete(GracefulShutdownContext context) {
|
||||
return context.getWorker().isShutdownComplete() || context.getWorker().getShardInfoShardConsumerMap().isEmpty();
|
||||
}
|
||||
|
||||
private String awaitingLogMessage(GracefulShutdownContext context) {
|
||||
long awaitingNotification = context.getNotificationCompleteLatch().getCount();
|
||||
long awaitingFinalShutdown = context.getShutdownCompleteLatch().getCount();
|
||||
|
||||
return String.format(
|
||||
"Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ",
|
||||
awaitingNotification, awaitingFinalShutdown);
|
||||
}
|
||||
|
||||
private String awaitingFinalShutdownMessage(GracefulShutdownContext context) {
|
||||
long outstanding = context.getShutdownCompleteLatch().getCount();
|
||||
return String.format("Waiting for %d record processors to complete final shutdown", outstanding);
|
||||
}
|
||||
|
||||
private boolean waitForRecordProcessors(GracefulShutdownContext context) {
|
||||
|
||||
//
|
||||
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
|
||||
// There is the possibility of a race condition where a lease is terminated after the shutdown request
|
||||
// notification is started, but before the ShardConsumer is sent the notification. In this case the
|
||||
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
|
||||
//
|
||||
try {
|
||||
while (!context.getNotificationCompleteLatch().await(1, TimeUnit.SECONDS)) {
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
log.info(awaitingLogMessage(context));
|
||||
if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
log.warn("Interrupted while waiting for notification complete, terminating shutdown. "
|
||||
+ awaitingLogMessage(context));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (Thread.interrupted()) {
|
||||
log.warn("Interrupted before worker shutdown, terminating shutdown");
|
||||
return false;
|
||||
}
|
||||
|
||||
//
|
||||
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
|
||||
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
|
||||
//
|
||||
context.getWorker().shutdown();
|
||||
|
||||
if (Thread.interrupted()) {
|
||||
log.warn("Interrupted after worker shutdown, terminating shutdown");
|
||||
return false;
|
||||
}
|
||||
|
||||
//
|
||||
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
|
||||
// processing. This should really be a no-op since as part of the notification completion the lease for
|
||||
// ShardConsumer is terminated.
|
||||
//
|
||||
try {
|
||||
while (!context.getShutdownCompleteLatch().await(1, TimeUnit.SECONDS)) {
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
log.info(awaitingFinalShutdownMessage(context));
|
||||
if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. "
|
||||
+ awaitingFinalShutdownMessage(context));
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* This checks to see if the worker has already hit it's shutdown target, while there is outstanding record
|
||||
* processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the
|
||||
* latch should be decremented before the shutdown completion.
|
||||
*
|
||||
* @param outstanding
|
||||
* the number of record processor still awaiting shutdown.
|
||||
*/
|
||||
private boolean workerShutdownWithRemaining(long outstanding, GracefulShutdownContext context) {
|
||||
if (isWorkerShutdownComplete(context)) {
|
||||
if (outstanding != 0) {
|
||||
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
|
||||
+ " with a current value of " + context.getShutdownCompleteLatch().getCount() + ". shutdownComplete: "
|
||||
+ context.getWorker().isShutdownComplete() + " -- Consumer Map: "
|
||||
+ context.getWorker().getShardInfoShardConsumerMap().size());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
GracefulShutdownContext context;
|
||||
try {
|
||||
context = startWorkerShutdown.call();
|
||||
} catch (Exception ex) {
|
||||
log.warn("Caught exception while requesting initial worker shutdown.", ex);
|
||||
throw ex;
|
||||
}
|
||||
return context.isShutdownAlreadyCompleted() || waitForRecordProcessors(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.List;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.List;
|
||||
|
|
|
|||
|
|
@ -1,155 +0,0 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete.
|
||||
*/
|
||||
class ShutdownFuture implements Future<Void> {
|
||||
|
||||
private static final Log log = LogFactory.getLog(ShutdownFuture.class);
|
||||
|
||||
private final CountDownLatch shutdownCompleteLatch;
|
||||
private final CountDownLatch notificationCompleteLatch;
|
||||
private final Worker worker;
|
||||
|
||||
ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) {
|
||||
this.shutdownCompleteLatch = shutdownCompleteLatch;
|
||||
this.notificationCompleteLatch = notificationCompleteLatch;
|
||||
this.worker = worker;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
throw new UnsupportedOperationException("Cannot cancel a shutdown process");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return isWorkerShutdownComplete();
|
||||
}
|
||||
|
||||
private boolean isWorkerShutdownComplete() {
|
||||
return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty();
|
||||
}
|
||||
|
||||
private long outstandingRecordProcessors(long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
//
|
||||
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
|
||||
// There is the possibility of a race condition where a lease is terminated after the shutdown request
|
||||
// notification is started, but before the ShardConsumer is sent the notification. In this case the
|
||||
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
|
||||
//
|
||||
if (!notificationCompleteLatch.await(timeout, unit)) {
|
||||
long awaitingNotification = notificationCompleteLatch.getCount();
|
||||
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
|
||||
log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and "
|
||||
+ awaitingFinalShutdown + " awaiting final shutdown");
|
||||
if (awaitingFinalShutdown != 0) {
|
||||
//
|
||||
// The number of record processor awaiting final shutdown should be a superset of the those awaiting
|
||||
// notification
|
||||
//
|
||||
return checkWorkerShutdownMiss(awaitingFinalShutdown);
|
||||
}
|
||||
}
|
||||
|
||||
long remaining = remainingTimeout(timeout, unit, startNanos);
|
||||
throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time.");
|
||||
|
||||
//
|
||||
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
|
||||
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
|
||||
//
|
||||
worker.shutdown();
|
||||
remaining = remainingTimeout(timeout, unit, startNanos);
|
||||
throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time.");
|
||||
|
||||
//
|
||||
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
|
||||
// processing. This should really be a no-op since as part of the notification completion the lease for
|
||||
// ShardConsumer is terminated.
|
||||
//
|
||||
if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) {
|
||||
long outstanding = shutdownCompleteLatch.getCount();
|
||||
log.info("Awaiting " + outstanding + " record processors to complete final shutdown");
|
||||
|
||||
return checkWorkerShutdownMiss(outstanding);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) {
|
||||
long checkNanos = System.nanoTime() - startNanos;
|
||||
return unit.toNanos(timeout) - checkNanos;
|
||||
}
|
||||
|
||||
private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException {
|
||||
if (remainingNanos <= 0) {
|
||||
throw new TimeoutException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This checks to see if the worker has already hit it's shutdown target, while there is outstanding record
|
||||
* processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the
|
||||
* latch should be decremented before the shutdown completion.
|
||||
*
|
||||
* @param outstanding
|
||||
* the number of record processor still awaiting shutdown.
|
||||
* @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already.
|
||||
*/
|
||||
private long checkWorkerShutdownMiss(long outstanding) {
|
||||
if (isWorkerShutdownComplete()) {
|
||||
if (outstanding != 0) {
|
||||
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
|
||||
+ " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: "
|
||||
+ worker.isShutdownComplete() + " -- Consumer Map: "
|
||||
+ worker.getShardInfoShardConsumerMap().size());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
return outstanding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void get() throws InterruptedException, ExecutionException {
|
||||
boolean complete = false;
|
||||
do {
|
||||
try {
|
||||
long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
|
||||
complete = outstanding == 0;
|
||||
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
|
||||
} catch (TimeoutException te) {
|
||||
log.info("Timeout while waiting for completion: " + te.getMessage());
|
||||
}
|
||||
|
||||
} while(!complete);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
long outstanding = outstandingRecordProcessors(timeout, unit);
|
||||
if (outstanding != 0) {
|
||||
throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown.");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import lombok.Getter;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -18,15 +18,18 @@ import java.util.Collection;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -52,14 +55,12 @@ import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
|||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Worker is the high level class that Kinesis applications use to start
|
||||
* processing data. It initializes and oversees different components (e.g.
|
||||
* syncing shard and lease information, tracking shard assignments, and
|
||||
* processing data from the shards).
|
||||
* Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
|
||||
* different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
|
||||
* the shards).
|
||||
*/
|
||||
public class Worker implements Runnable {
|
||||
|
||||
|
|
@ -96,17 +97,27 @@ public class Worker implements Runnable {
|
|||
|
||||
// Holds consumers for shards the worker is currently tracking. Key is shard
|
||||
// info, value is ShardConsumer.
|
||||
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap =
|
||||
new ConcurrentHashMap<ShardInfo, ShardConsumer>();
|
||||
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
|
||||
private final boolean cleanupLeasesUponShardCompletion;
|
||||
|
||||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
|
||||
/**
|
||||
* Used to ensure that only one requestedShutdown is in progress at a time.
|
||||
*/
|
||||
private Future<Boolean> gracefulShutdownFuture;
|
||||
@VisibleForTesting
|
||||
protected boolean gracefuleShutdownStarted = false;
|
||||
@VisibleForTesting
|
||||
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
|
|
@ -117,108 +128,119 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
|
||||
config.getKinesisClientConfiguration()),
|
||||
KinesisClientLibConfiguration config, ExecutorService execService) {
|
||||
this(recordProcessorFactory, config,
|
||||
new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
|
||||
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
|
||||
config.getDynamoDBClientConfiguration()),
|
||||
new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
|
||||
config.getCloudWatchClientConfiguration()), execService);
|
||||
config.getCloudWatchClientConfiguration()),
|
||||
execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
IMetricsFactory metricsFactory) {
|
||||
KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) {
|
||||
this(recordProcessorFactory, config, metricsFactory, getExecutorService());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
|
||||
config.getKinesisClientConfiguration()),
|
||||
KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) {
|
||||
this(recordProcessorFactory, config,
|
||||
new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
|
||||
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
|
||||
config.getDynamoDBClientConfiguration()), metricsFactory, execService);
|
||||
config.getDynamoDBClientConfiguration()),
|
||||
metricsFactory, execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param kinesisClient
|
||||
* Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient
|
||||
* DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient
|
||||
* CloudWatch Client for publishing metrics
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
|
||||
AmazonCloudWatch cloudWatchClient) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param kinesisClient
|
||||
* Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient
|
||||
* DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient
|
||||
* CloudWatch Client for publishing metrics
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
AmazonCloudWatch cloudWatchClient,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient,
|
||||
getMetricsFactory(cloudWatchClient, config), execService);
|
||||
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
|
||||
AmazonCloudWatch cloudWatchClient, ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, getMetricsFactory(cloudWatchClient, config),
|
||||
execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param kinesisClient
|
||||
* Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient
|
||||
* DynamoDB client used for checkpoints and tracking leases
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
this(
|
||||
config.getApplicationName(),
|
||||
new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
|
||||
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
|
||||
IMetricsFactory metricsFactory, ExecutorService execService) {
|
||||
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
|
||||
new StreamConfig(
|
||||
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
|
||||
.getProxy(config.getStreamName()),
|
||||
|
|
@ -226,11 +248,8 @@ public class Worker implements Runnable {
|
|||
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
|
||||
config.shouldValidateSequenceNumberBeforeCheckpointing(),
|
||||
config.getInitialPositionInStreamExtended()),
|
||||
config.getInitialPositionInStreamExtended(),
|
||||
config.getParentShardPollIntervalMillis(),
|
||||
config.getShardSyncIntervalMillis(),
|
||||
config.shouldCleanupLeasesUponShardCompletion(),
|
||||
null,
|
||||
config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
|
||||
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
|
||||
new KinesisClientLibLeaseCoordinator(
|
||||
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
|
||||
config.getWorkerIdentifier(),
|
||||
|
|
@ -277,41 +296,43 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param applicationName Name of the Kinesis application
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param streamConfig Stream configuration
|
||||
* @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start
|
||||
* fetching data from this location in the stream when an application starts up for the first time and
|
||||
* there are no checkpoints. If there are checkpoints, we start from the checkpoint position.
|
||||
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
|
||||
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
|
||||
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in
|
||||
* Kinesis)
|
||||
* @param checkpoint Used to get/set checkpoints
|
||||
* @param leaseCoordinator Lease coordinator (coordinates currently owned leases)
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
|
||||
* @param shardPrioritization Provides prioritization logic to decide which available shards process first
|
||||
* @param applicationName
|
||||
* Name of the Kinesis application
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param streamConfig
|
||||
* Stream configuration
|
||||
* @param initialPositionInStream
|
||||
* One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
|
||||
* this location in the stream when an application starts up for the first time and there are no
|
||||
* checkpoints. If there are checkpoints, we start from the checkpoint position.
|
||||
* @param parentShardPollIntervalMillis
|
||||
* Wait for this long between polls to check if parent shards are done
|
||||
* @param shardSyncIdleTimeMillis
|
||||
* Time between tasks to sync leases and Kinesis shards
|
||||
* @param cleanupLeasesUponShardCompletion
|
||||
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
|
||||
* @param checkpoint
|
||||
* Used to get/set checkpoints
|
||||
* @param leaseCoordinator
|
||||
* Lease coordinator (coordinates currently owned leases)
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
* @param taskBackoffTimeMillis
|
||||
* Backoff period when tasks encounter an exception
|
||||
* @param shardPrioritization
|
||||
* Provides prioritization logic to decide which available shards process first
|
||||
*/
|
||||
// NOTE: This has package level access solely for testing
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||
Worker(String applicationName,
|
||||
IRecordProcessorFactory recordProcessorFactory,
|
||||
StreamConfig streamConfig,
|
||||
InitialPositionInStreamExtended initialPositionInStream,
|
||||
long parentShardPollIntervalMillis,
|
||||
long shardSyncIdleTimeMillis,
|
||||
boolean cleanupLeasesUponShardCompletion,
|
||||
ICheckpoint checkpoint,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator,
|
||||
ExecutorService execService,
|
||||
IMetricsFactory metricsFactory,
|
||||
long taskBackoffTimeMillis,
|
||||
long failoverTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
ShardPrioritization shardPrioritization) {
|
||||
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
|
||||
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
|
||||
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
|
||||
this.applicationName = applicationName;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.streamConfig = streamConfig;
|
||||
|
|
@ -323,13 +344,8 @@ public class Worker implements Runnable {
|
|||
this.executorService = execService;
|
||||
this.leaseCoordinator = leaseCoordinator;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.controlServer =
|
||||
new ShardSyncTaskManager(streamConfig.getStreamProxy(),
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
initialPositionInStream,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
shardSyncIdleTimeMillis,
|
||||
metricsFactory,
|
||||
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
|
||||
executorService);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
|
|
@ -345,8 +361,7 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Start consuming data from the stream, and pass it to the application
|
||||
* record processors.
|
||||
* Start consuming data from the stream, and pass it to the application record processors.
|
||||
*/
|
||||
public void run() {
|
||||
if (shutdown) {
|
||||
|
|
@ -419,12 +434,8 @@ public class Worker implements Runnable {
|
|||
if (!skipShardSyncAtWorkerInitializationIfLeasesExist
|
||||
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
|
||||
LOG.info("Syncing Kinesis shard info");
|
||||
ShardSyncTask shardSyncTask =
|
||||
new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||
leaseCoordinator.getLeaseManager(),
|
||||
initialPosition,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
0L);
|
||||
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L);
|
||||
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
||||
} else {
|
||||
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
||||
|
|
@ -461,14 +472,13 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* NOTE: This method is internal/private to the Worker class. It has package
|
||||
* access solely for testing.
|
||||
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
|
||||
*
|
||||
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
|
||||
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
|
||||
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows.
|
||||
* ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2"));
|
||||
* ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
|
||||
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
|
||||
* shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); ShardInfo
|
||||
* shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
|
||||
*/
|
||||
void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
|
||||
for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) {
|
||||
|
|
@ -508,8 +518,57 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware},
|
||||
* of the impending shutdown. This gives the record processor a final chance to checkpoint.
|
||||
* Starts the requestedShutdown process, and returns a future that can be used to track the process.
|
||||
*
|
||||
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
|
||||
* indicates the process behavior
|
||||
*
|
||||
* @return a future that will be set once shutdown is completed.
|
||||
*/
|
||||
@Deprecated
|
||||
public Future<Void> requestShutdown() {
|
||||
|
||||
Future<Boolean> requestedShutdownFuture = startGracefulShutdown();
|
||||
|
||||
return new Future<Void>() {
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return requestedShutdownFuture.cancel(mayInterruptIfRunning);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return requestedShutdownFuture.isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return requestedShutdownFuture.isDone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void get() throws InterruptedException, ExecutionException {
|
||||
requestedShutdownFuture.get();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void get(long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
requestedShutdownFuture.get(timeout, unit);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests a graceful shutdown of the worker, notifying record processors, that implement
|
||||
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
|
||||
* checkpoint.
|
||||
*
|
||||
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
|
||||
* previous future.
|
||||
*
|
||||
* <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is
|
||||
* lost after requesting shutdown, but before the notification is dispatched.</b>
|
||||
|
|
@ -533,12 +592,50 @@ public class Worker implements Runnable {
|
|||
* <li>Once the worker shutdown is complete, the returned future is completed.</li>
|
||||
* </ol>
|
||||
*
|
||||
*
|
||||
*
|
||||
* @return a Future that will be set once the shutdown is complete.
|
||||
* @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
|
||||
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
|
||||
* terminate early.
|
||||
*/
|
||||
public Future<Void> requestShutdown() {
|
||||
public Future<Boolean> startGracefulShutdown() {
|
||||
synchronized (this) {
|
||||
if (gracefulShutdownFuture == null) {
|
||||
gracefulShutdownFuture = gracefulShutdownCoordinator
|
||||
.startGracefulShutdown(createGracefulShutdownCallable());
|
||||
}
|
||||
}
|
||||
return gracefulShutdownFuture;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a callable that will execute the graceful shutdown process. This callable can be used to execute graceful
|
||||
* shutdowns in your own executor, or execute the shutdown synchronously.
|
||||
*
|
||||
* @return a callable that run the graceful shutdown process. This may return a callable that return true if the
|
||||
* graceful shutdown has already been completed.
|
||||
* @throws IllegalStateException
|
||||
* thrown by the callable if another callable has already started the shutdown process.
|
||||
*/
|
||||
public Callable<Boolean> createGracefulShutdownCallable() {
|
||||
if (isShutdownComplete()) {
|
||||
return () -> true;
|
||||
}
|
||||
Callable<GracefulShutdownContext> startShutdown = createWorkerShutdownCallable();
|
||||
return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown);
|
||||
}
|
||||
|
||||
public boolean hasGracefulShutdownStarted() {
|
||||
return gracefuleShutdownStarted;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
|
||||
return () -> {
|
||||
synchronized (this) {
|
||||
if (this.gracefuleShutdownStarted) {
|
||||
throw new IllegalStateException("Requested shutdown has already been started");
|
||||
}
|
||||
this.gracefuleShutdownStarted = true;
|
||||
}
|
||||
//
|
||||
// Stop accepting new leases. Once we do this we can be sure that
|
||||
// no more leases will be acquired.
|
||||
|
|
@ -551,13 +648,13 @@ public class Worker implements Runnable {
|
|||
// If there are no leases notification is already completed, but we still need to shutdown the worker.
|
||||
//
|
||||
this.shutdown();
|
||||
return Futures.immediateFuture(null);
|
||||
return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED;
|
||||
}
|
||||
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
|
||||
CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
|
||||
for (KinesisClientLease lease : leases) {
|
||||
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
|
||||
notificationCompleteLatch, shutdownCompleteLatch);
|
||||
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator,
|
||||
lease, notificationCompleteLatch, shutdownCompleteLatch);
|
||||
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
|
||||
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
|
||||
if (consumer != null) {
|
||||
|
|
@ -572,9 +669,8 @@ public class Worker implements Runnable {
|
|||
shutdownCompleteLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
|
||||
|
||||
return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this);
|
||||
};
|
||||
}
|
||||
|
||||
boolean isShutdownComplete() {
|
||||
|
|
@ -618,8 +714,8 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Perform final shutdown related tasks for the worker including shutting down worker owned
|
||||
* executor services, threads, etc.
|
||||
* Perform final shutdown related tasks for the worker including shutting down worker owned executor services,
|
||||
* threads, etc.
|
||||
*/
|
||||
private void finalShutdown() {
|
||||
LOG.info("Starting worker's final shutdown.");
|
||||
|
|
@ -660,11 +756,12 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* NOTE: This method is internal/private to the Worker class. It has package
|
||||
* access solely for testing.
|
||||
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
|
||||
*
|
||||
* @param shardInfo Kinesis shard info
|
||||
* @param factory RecordProcessor factory
|
||||
* @param shardInfo
|
||||
* Kinesis shard info
|
||||
* @param factory
|
||||
* RecordProcessor factory
|
||||
* @return ShardConsumer for the shard
|
||||
*/
|
||||
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
|
||||
|
|
@ -688,15 +785,15 @@ public class Worker implements Runnable {
|
|||
|
||||
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
|
||||
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
|
||||
executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist);
|
||||
executorService, metricsFactory, taskBackoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Logger for suppressing too much INFO logging. To avoid too much logging
|
||||
* information Worker will output logging at INFO level for a single pass
|
||||
* through the main loop every minute. At DEBUG level it will output all
|
||||
* INFO logs on every pass.
|
||||
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
|
||||
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
|
||||
* every pass.
|
||||
*/
|
||||
private static class WorkerLog {
|
||||
|
||||
|
|
@ -752,90 +849,89 @@ public class Worker implements Runnable {
|
|||
|
||||
// Backwards compatible constructors
|
||||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
|
||||
* constructors taking "Client" objects.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param kinesisClient
|
||||
* Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient
|
||||
* DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient
|
||||
* CloudWatch Client for publishing metrics
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
AmazonCloudWatchClient cloudWatchClient) {
|
||||
this(recordProcessorFactory,
|
||||
config,
|
||||
(AmazonKinesis) kinesisClient,
|
||||
(AmazonDynamoDB) dynamoDBClient,
|
||||
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
|
||||
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
|
||||
(AmazonCloudWatch) cloudWatchClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
|
||||
* constructors taking "Client" objects.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param kinesisClient
|
||||
* Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient
|
||||
* DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient
|
||||
* CloudWatch Client for publishing metrics
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
AmazonCloudWatchClient cloudWatchClient,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory,
|
||||
config,
|
||||
(AmazonKinesis) kinesisClient,
|
||||
(AmazonDynamoDB) dynamoDBClient,
|
||||
(AmazonCloudWatch) cloudWatchClient,
|
||||
execService);
|
||||
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
|
||||
(AmazonCloudWatch) cloudWatchClient, execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
|
||||
* constructors taking "Client" objects.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @param kinesisClient
|
||||
* Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient
|
||||
* DynamoDB client used for checkpoints and tracking leases
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
*/
|
||||
public Worker(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory,
|
||||
config,
|
||||
(AmazonKinesis) kinesisClient,
|
||||
(AmazonDynamoDB) dynamoDBClient,
|
||||
metricsFactory,
|
||||
execService);
|
||||
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
|
||||
metricsFactory, execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given configuration, returns appropriate metrics factory.
|
||||
* @param cloudWatchClient Amazon CloudWatch client
|
||||
* @param config KinesisClientLibConfiguration
|
||||
*
|
||||
* @param cloudWatchClient
|
||||
* Amazon CloudWatch client
|
||||
* @param config
|
||||
* KinesisClientLibConfiguration
|
||||
* @return Returns metrics factory based on the config.
|
||||
*/
|
||||
private static IMetricsFactory getMetricsFactory(
|
||||
AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) {
|
||||
private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient,
|
||||
KinesisClientLibConfiguration config) {
|
||||
IMetricsFactory metricsFactory;
|
||||
if (config.getMetricsLevel() == MetricsLevel.NONE) {
|
||||
metricsFactory = new NullMetricsFactory();
|
||||
|
|
@ -845,12 +941,8 @@ public class Worker implements Runnable {
|
|||
cloudWatchClient.setRegion(region);
|
||||
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
|
||||
}
|
||||
metricsFactory = new WorkerCWMetricsFactory(
|
||||
cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(),
|
||||
config.getMetricsMaxQueueSize(),
|
||||
config.getMetricsLevel(),
|
||||
metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
|
||||
config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
|
||||
config.getMetricsEnabledDimensions());
|
||||
}
|
||||
return metricsFactory;
|
||||
|
|
@ -858,6 +950,7 @@ public class Worker implements Runnable {
|
|||
|
||||
/**
|
||||
* Returns default executor service that should be used by the worker.
|
||||
*
|
||||
* @return Default executor service that should be used by the worker.
|
||||
*/
|
||||
private static ExecutorService getExecutorService() {
|
||||
|
|
@ -866,26 +959,19 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance
|
||||
* or not.
|
||||
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
|
||||
* Visible and non-final only for testing.
|
||||
*/
|
||||
static class WorkerCWMetricsFactory extends CWMetricsFactory {
|
||||
|
||||
WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient,
|
||||
String namespace,
|
||||
long bufferTimeMillis,
|
||||
int maxQueueSize,
|
||||
MetricsLevel metricsLevel,
|
||||
Set<String> metricsEnabledDimensions) {
|
||||
super(cloudWatchClient, namespace, bufferTimeMillis,
|
||||
maxQueueSize, metricsLevel, metricsEnabledDimensions);
|
||||
WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis,
|
||||
int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
|
||||
super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance
|
||||
* or not.
|
||||
* Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance or not.
|
||||
* Visible and non-final only for testing.
|
||||
*/
|
||||
static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
|
|
@ -919,24 +1005,25 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Provide a V1
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}.
|
||||
* Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
|
||||
* IRecordProcessor}.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder recordProcessorFactory(
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
|
||||
recordProcessorFactory) {
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory) {
|
||||
this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a V2
|
||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}.
|
||||
* Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
|
||||
* IRecordProcessor}.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) {
|
||||
|
|
@ -947,7 +1034,8 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Set the Worker config.
|
||||
*
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param config
|
||||
* Kinesis Client Library configuration
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder config(KinesisClientLibConfiguration config) {
|
||||
|
|
@ -958,7 +1046,8 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Set the Kinesis client.
|
||||
*
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param kinesisClient
|
||||
* Kinesis Client used for fetching data
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder kinesisClient(AmazonKinesis kinesisClient) {
|
||||
|
|
@ -969,7 +1058,8 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Set the DynamoDB client.
|
||||
*
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param dynamoDBClient
|
||||
* DynamoDB client used for checkpoints and tracking leases
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) {
|
||||
|
|
@ -980,7 +1070,8 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Set the Cloudwatch client.
|
||||
*
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
* @param cloudWatchClient
|
||||
* CloudWatch Client for publishing metrics
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) {
|
||||
|
|
@ -991,7 +1082,8 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Set the metrics factory.
|
||||
*
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder metricsFactory(IMetricsFactory metricsFactory) {
|
||||
|
|
@ -1002,7 +1094,8 @@ public class Worker implements Runnable {
|
|||
/**
|
||||
* Set the executor service for processing records.
|
||||
*
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder execService(ExecutorService execService) {
|
||||
|
|
@ -1036,8 +1129,7 @@ public class Worker implements Runnable {
|
|||
"Kinesis Client Library configuration needs to be provided to build Worker");
|
||||
}
|
||||
if (recordProcessorFactory == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"A Record Processor Factory needs to be provided to build Worker");
|
||||
throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker");
|
||||
}
|
||||
|
||||
if (execService == null) {
|
||||
|
|
@ -1090,6 +1182,7 @@ public class Worker implements Runnable {
|
|||
shardPrioritization = new ParentsFirstShardPrioritization(1);
|
||||
}
|
||||
|
||||
|
||||
return new Worker(config.getApplicationName(),
|
||||
recordProcessorFactory,
|
||||
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
|
||||
|
|
|
|||
|
|
@ -1,3 +1,18 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
// source: messages.proto
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.utils;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,322 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.verification.VerificationMode;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class GracefulShutdownCoordinatorTest {
|
||||
|
||||
@Mock
|
||||
private CountDownLatch shutdownCompleteLatch;
|
||||
@Mock
|
||||
private CountDownLatch notificationCompleteLatch;
|
||||
@Mock
|
||||
private Worker worker;
|
||||
@Mock
|
||||
private Callable<GracefulShutdownContext> contextCallable;
|
||||
@Mock
|
||||
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
|
||||
|
||||
@Test
|
||||
public void testAllShutdownCompletedAlready() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
|
||||
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(true));
|
||||
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
|
||||
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
|
||||
verify(worker).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationNotCompletedYet() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
mockLatchAwait(notificationCompleteLatch, false, true);
|
||||
when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L);
|
||||
mockLatchAwait(shutdownCompleteLatch, true);
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L);
|
||||
|
||||
when(worker.isShutdownComplete()).thenReturn(false, true);
|
||||
mockShardInfoConsumerMap(1, 0);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(true));
|
||||
verify(notificationCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class));
|
||||
verify(notificationCompleteLatch).getCount();
|
||||
|
||||
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
|
||||
verify(shutdownCompleteLatch, times(2)).getCount();
|
||||
|
||||
verify(worker).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownNotCompletedYet() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
mockLatchAwait(notificationCompleteLatch, true);
|
||||
mockLatchAwait(shutdownCompleteLatch, false, true);
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 0L);
|
||||
|
||||
when(worker.isShutdownComplete()).thenReturn(false, true);
|
||||
mockShardInfoConsumerMap(1, 0);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(true));
|
||||
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
|
||||
verify(notificationCompleteLatch, never()).getCount();
|
||||
|
||||
verify(shutdownCompleteLatch, times(2)).await(anyLong(), any(TimeUnit.class));
|
||||
verify(shutdownCompleteLatch, times(2)).getCount();
|
||||
|
||||
verify(worker).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleAttemptsForNotification() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
mockLatchAwait(notificationCompleteLatch, false, false, true);
|
||||
when(notificationCompleteLatch.getCount()).thenReturn(2L, 1L, 0L);
|
||||
|
||||
mockLatchAwait(shutdownCompleteLatch, true);
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(2L, 2L, 1L, 1L, 0L);
|
||||
|
||||
when(worker.isShutdownComplete()).thenReturn(false, false, false, true);
|
||||
mockShardInfoConsumerMap(2, 1, 0);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(true));
|
||||
|
||||
verifyLatchAwait(notificationCompleteLatch, 3);
|
||||
verify(notificationCompleteLatch, times(2)).getCount();
|
||||
|
||||
verifyLatchAwait(shutdownCompleteLatch, 1);
|
||||
verify(shutdownCompleteLatch, times(4)).getCount();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWorkerAlreadyShutdownAtNotification() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
mockLatchAwait(notificationCompleteLatch, false, true);
|
||||
when(notificationCompleteLatch.getCount()).thenReturn(1L, 0L);
|
||||
|
||||
mockLatchAwait(shutdownCompleteLatch, true);
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 0L);
|
||||
|
||||
when(worker.isShutdownComplete()).thenReturn(true);
|
||||
mockShardInfoConsumerMap(0);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verify(notificationCompleteLatch).getCount();
|
||||
|
||||
verifyLatchAwait(shutdownCompleteLatch, never());
|
||||
verify(shutdownCompleteLatch, times(3)).getCount();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWorkerAlreadyShutdownAtComplete() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
mockLatchAwait(notificationCompleteLatch, true);
|
||||
|
||||
mockLatchAwait(shutdownCompleteLatch, false, true);
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L, 1L, 1L);
|
||||
|
||||
when(worker.isShutdownComplete()).thenReturn(true);
|
||||
mockShardInfoConsumerMap(0);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verify(notificationCompleteLatch, never()).getCount();
|
||||
|
||||
verifyLatchAwait(shutdownCompleteLatch);
|
||||
verify(shutdownCompleteLatch, times(3)).getCount();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationInterrupted() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException());
|
||||
when(notificationCompleteLatch.getCount()).thenReturn(1L);
|
||||
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verifyLatchAwait(shutdownCompleteLatch, never());
|
||||
verify(worker, never()).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownInterrupted() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
|
||||
|
||||
when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException());
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verifyLatchAwait(shutdownCompleteLatch);
|
||||
verify(worker).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptedAfterNotification() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> {
|
||||
Thread.currentThread().interrupt();
|
||||
return true;
|
||||
});
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verifyLatchAwait(shutdownCompleteLatch, never());
|
||||
verify(worker, never()).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptedAfterWorkerShutdown() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
Thread.currentThread().interrupt();
|
||||
return true;
|
||||
}).when(worker).shutdown();
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verifyLatchAwait(shutdownCompleteLatch, never());
|
||||
verify(worker).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptedDuringNotification() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
});
|
||||
when(notificationCompleteLatch.getCount()).thenReturn(1L);
|
||||
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verify(notificationCompleteLatch).getCount();
|
||||
|
||||
verifyLatchAwait(shutdownCompleteLatch, never());
|
||||
verify(shutdownCompleteLatch).getCount();
|
||||
|
||||
verify(worker, never()).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptedDuringShutdown() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = buildRequestedShutdownCallable();
|
||||
|
||||
when(notificationCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
|
||||
|
||||
when(shutdownCompleteLatch.await(anyLong(), any(TimeUnit.class))).thenAnswer(invocation -> {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
});
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
|
||||
|
||||
assertThat(requestedShutdownCallable.call(), equalTo(false));
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verify(notificationCompleteLatch, never()).getCount();
|
||||
|
||||
verifyLatchAwait(shutdownCompleteLatch);
|
||||
verify(shutdownCompleteLatch).getCount();
|
||||
|
||||
verify(worker).shutdown();
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testWorkerShutdownCallableThrows() throws Exception {
|
||||
Callable<Boolean> requestedShutdownCallable = new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable);
|
||||
when(contextCallable.call()).thenThrow(new IllegalStateException("Bad Shutdown"));
|
||||
|
||||
requestedShutdownCallable.call();
|
||||
}
|
||||
|
||||
private void verifyLatchAwait(CountDownLatch latch) throws Exception {
|
||||
verifyLatchAwait(latch, times(1));
|
||||
}
|
||||
|
||||
private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception {
|
||||
verifyLatchAwait(latch, times(times));
|
||||
}
|
||||
|
||||
private void verifyLatchAwait(CountDownLatch latch, VerificationMode verificationMode) throws Exception {
|
||||
verify(latch, verificationMode).await(anyLong(), any(TimeUnit.class));
|
||||
}
|
||||
|
||||
private void mockLatchAwait(CountDownLatch latch, Boolean initial, Boolean... remaining) throws Exception {
|
||||
when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, remaining);
|
||||
}
|
||||
|
||||
private Callable<Boolean> buildRequestedShutdownCallable() throws Exception {
|
||||
GracefulShutdownContext context = new GracefulShutdownContext(shutdownCompleteLatch,
|
||||
notificationCompleteLatch, worker);
|
||||
when(contextCallable.call()).thenReturn(context);
|
||||
return new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable);
|
||||
}
|
||||
|
||||
private void mockShardInfoConsumerMap(Integer initialItemCount, Integer... additionalItemCounts) {
|
||||
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
|
||||
Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length];
|
||||
for (int i = 0; i < additionalItemCounts.length; ++i) {
|
||||
additionalEmptyStates[i] = additionalItemCounts[i] == 0;
|
||||
}
|
||||
when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts);
|
||||
when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
|
|||
|
|
@ -1,236 +0,0 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.mockito.stubbing.OngoingStubbing;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class ShutdownFutureTest {
|
||||
|
||||
@Mock
|
||||
private CountDownLatch shutdownCompleteLatch;
|
||||
@Mock
|
||||
private CountDownLatch notificationCompleteLatch;
|
||||
@Mock
|
||||
private Worker worker;
|
||||
@Mock
|
||||
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
|
||||
|
||||
@Test
|
||||
public void testSimpleGetAlreadyCompleted() throws Exception {
|
||||
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
|
||||
|
||||
mockNotificationComplete(true);
|
||||
mockShutdownComplete(true);
|
||||
|
||||
future.get();
|
||||
|
||||
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
|
||||
verify(worker).shutdown();
|
||||
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationNotCompleted() throws Exception {
|
||||
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
|
||||
|
||||
mockNotificationComplete(false, true);
|
||||
mockShutdownComplete(true);
|
||||
|
||||
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
|
||||
when(shardInfoConsumerMap.isEmpty()).thenReturn(false);
|
||||
when(worker.isShutdownComplete()).thenReturn(false);
|
||||
|
||||
when(notificationCompleteLatch.getCount()).thenReturn(1L);
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
|
||||
|
||||
expectedTimeoutException(future);
|
||||
|
||||
verify(worker, never()).shutdown();
|
||||
|
||||
awaitFuture(future);
|
||||
|
||||
verify(notificationCompleteLatch).getCount();
|
||||
verifyLatchAwait(notificationCompleteLatch, 2);
|
||||
|
||||
verify(shutdownCompleteLatch).getCount();
|
||||
verifyLatchAwait(shutdownCompleteLatch);
|
||||
|
||||
verify(worker).shutdown();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownNotCompleted() throws Exception {
|
||||
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
|
||||
mockNotificationComplete(true);
|
||||
mockShutdownComplete(false, true);
|
||||
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
|
||||
when(worker.isShutdownComplete()).thenReturn(false);
|
||||
|
||||
mockShardInfoConsumerMap(1);
|
||||
|
||||
expectedTimeoutException(future);
|
||||
verify(worker).shutdown();
|
||||
awaitFuture(future);
|
||||
|
||||
verifyLatchAwait(notificationCompleteLatch, 2);
|
||||
verifyLatchAwait(shutdownCompleteLatch, 2);
|
||||
|
||||
verify(worker).isShutdownComplete();
|
||||
verify(worker).getShardInfoShardConsumerMap();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownNotCompleteButWorkerShutdown() throws Exception {
|
||||
ShutdownFuture future = create();
|
||||
|
||||
mockNotificationComplete(true);
|
||||
mockShutdownComplete(false);
|
||||
|
||||
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
|
||||
when(worker.isShutdownComplete()).thenReturn(true);
|
||||
mockShardInfoConsumerMap(1);
|
||||
|
||||
awaitFuture(future);
|
||||
verify(worker).shutdown();
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verifyLatchAwait(shutdownCompleteLatch);
|
||||
|
||||
verify(worker, times(2)).isShutdownComplete();
|
||||
verify(worker).getShardInfoShardConsumerMap();
|
||||
verify(shardInfoConsumerMap).size();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception {
|
||||
ShutdownFuture future = create();
|
||||
mockNotificationComplete(true);
|
||||
mockShutdownComplete(false);
|
||||
|
||||
mockOutstanding(shutdownCompleteLatch, 1L);
|
||||
|
||||
when(worker.isShutdownComplete()).thenReturn(false);
|
||||
mockShardInfoConsumerMap(0);
|
||||
|
||||
awaitFuture(future);
|
||||
verify(worker).shutdown();
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verifyLatchAwait(shutdownCompleteLatch);
|
||||
|
||||
verify(worker, times(2)).isShutdownComplete();
|
||||
verify(worker, times(2)).getShardInfoShardConsumerMap();
|
||||
|
||||
verify(shardInfoConsumerMap).isEmpty();
|
||||
verify(shardInfoConsumerMap).size();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception {
|
||||
ShutdownFuture future = create();
|
||||
mockNotificationComplete(false);
|
||||
mockShutdownComplete(false);
|
||||
|
||||
mockOutstanding(notificationCompleteLatch, 1L);
|
||||
mockOutstanding(shutdownCompleteLatch, 1L);
|
||||
|
||||
when(worker.isShutdownComplete()).thenReturn(false);
|
||||
mockShardInfoConsumerMap(0);
|
||||
|
||||
awaitFuture(future);
|
||||
verify(worker, never()).shutdown();
|
||||
verifyLatchAwait(notificationCompleteLatch);
|
||||
verify(shutdownCompleteLatch, never()).await();
|
||||
|
||||
verify(worker, times(2)).isShutdownComplete();
|
||||
verify(worker, times(2)).getShardInfoShardConsumerMap();
|
||||
|
||||
verify(shardInfoConsumerMap).isEmpty();
|
||||
verify(shardInfoConsumerMap).size();
|
||||
}
|
||||
|
||||
@Test(expected = TimeoutException.class)
|
||||
public void testTimeExceededException() throws Exception {
|
||||
ShutdownFuture future = create();
|
||||
mockNotificationComplete(false);
|
||||
mockOutstanding(notificationCompleteLatch, 1L);
|
||||
when(worker.isShutdownComplete()).thenReturn(false);
|
||||
mockShardInfoConsumerMap(1);
|
||||
|
||||
future.get(1, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
private ShutdownFuture create() {
|
||||
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
|
||||
}
|
||||
|
||||
private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) {
|
||||
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
|
||||
Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length];
|
||||
for(int i = 0; i < additionalItemCounts.length; ++i) {
|
||||
additionalEmptyStates[i] = additionalItemCounts[i] == 0;
|
||||
}
|
||||
when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts);
|
||||
when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates);
|
||||
}
|
||||
|
||||
private void verifyLatchAwait(CountDownLatch latch) throws Exception {
|
||||
verifyLatchAwait(latch, 1);
|
||||
}
|
||||
|
||||
private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception {
|
||||
verify(latch, times(times)).await(anyLong(), any(TimeUnit.class));
|
||||
}
|
||||
|
||||
private void expectedTimeoutException(ShutdownFuture future) throws Exception {
|
||||
boolean gotTimeout = false;
|
||||
try {
|
||||
awaitFuture(future);
|
||||
} catch (TimeoutException te) {
|
||||
gotTimeout = true;
|
||||
}
|
||||
assertThat("Expected a timeout exception to occur", gotTimeout);
|
||||
}
|
||||
|
||||
private void awaitFuture(ShutdownFuture future) throws Exception {
|
||||
future.get(1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception {
|
||||
mockLatch(notificationCompleteLatch, initial, states);
|
||||
|
||||
}
|
||||
|
||||
private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception {
|
||||
mockLatch(shutdownCompleteLatch, initial, states);
|
||||
}
|
||||
|
||||
private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception {
|
||||
when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states);
|
||||
}
|
||||
|
||||
private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception {
|
||||
when(latch.getCount()).thenReturn(remaining, additionalRemaining);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -762,7 +762,7 @@ public class WorkerTest {
|
|||
verify(executorService, atLeastOnce()).submit(argThat(
|
||||
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
|
||||
|
||||
worker.requestShutdown();
|
||||
worker.createWorkerShutdownCallable().call();
|
||||
worker.runProcessLoop();
|
||||
|
||||
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
|
||||
|
|
@ -781,6 +781,146 @@ public class WorkerTest {
|
|||
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testShutdownCallableNotAllowedTwice() throws Exception {
|
||||
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
StreamConfig streamConfig = mock(StreamConfig.class);
|
||||
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
|
||||
|
||||
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
|
||||
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
|
||||
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
|
||||
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
|
||||
|
||||
final List<KinesisClientLease> leases = new ArrayList<>();
|
||||
final List<ShardInfo> currentAssignments = new ArrayList<>();
|
||||
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
|
||||
leases.add(lease);
|
||||
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
|
||||
lease.getParentShardIds(), lease.getCheckpoint()));
|
||||
|
||||
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
|
||||
@Override
|
||||
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
|
||||
return leases;
|
||||
}
|
||||
});
|
||||
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
|
||||
@Override
|
||||
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
|
||||
return currentAssignments;
|
||||
}
|
||||
});
|
||||
|
||||
IRecordProcessor processor = mock(IRecordProcessor.class);
|
||||
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
|
||||
|
||||
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
|
||||
@Override
|
||||
void postConstruct() {
|
||||
this.gracefuleShutdownStarted = true;
|
||||
}
|
||||
};
|
||||
|
||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||
when(taskFuture.isDone()).thenReturn(true);
|
||||
when(taskFuture.get()).thenReturn(taskResult);
|
||||
|
||||
worker.runProcessLoop();
|
||||
|
||||
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
|
||||
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
|
||||
|
||||
worker.runProcessLoop();
|
||||
|
||||
verify(executorService, atLeastOnce()).submit(argThat(
|
||||
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
|
||||
|
||||
assertThat(worker.hasGracefulShutdownStarted(), equalTo(true));
|
||||
worker.createWorkerShutdownCallable().call();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGracefulShutdownSingleFuture() throws Exception {
|
||||
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
StreamConfig streamConfig = mock(StreamConfig.class);
|
||||
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
|
||||
|
||||
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
|
||||
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
|
||||
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
|
||||
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
|
||||
|
||||
final List<KinesisClientLease> leases = new ArrayList<>();
|
||||
final List<ShardInfo> currentAssignments = new ArrayList<>();
|
||||
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
|
||||
leases.add(lease);
|
||||
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
|
||||
lease.getParentShardIds(), lease.getCheckpoint()));
|
||||
|
||||
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
|
||||
@Override
|
||||
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
|
||||
return leases;
|
||||
}
|
||||
});
|
||||
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
|
||||
@Override
|
||||
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
|
||||
return currentAssignments;
|
||||
}
|
||||
});
|
||||
|
||||
IRecordProcessor processor = mock(IRecordProcessor.class);
|
||||
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
|
||||
|
||||
GracefulShutdownCoordinator coordinator = mock(GracefulShutdownCoordinator.class);
|
||||
when(coordinator.createGracefulShutdownCallable(any(Callable.class))).thenReturn(() -> true);
|
||||
|
||||
Future<Boolean> gracefulShutdownFuture = mock(Future.class);
|
||||
|
||||
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
|
||||
|
||||
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
|
||||
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
|
||||
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
|
||||
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
|
||||
@Override
|
||||
void postConstruct() {
|
||||
this.gracefulShutdownCoordinator = coordinator;
|
||||
}
|
||||
};
|
||||
|
||||
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
|
||||
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
|
||||
when(taskFuture.isDone()).thenReturn(true);
|
||||
when(taskFuture.get()).thenReturn(taskResult);
|
||||
|
||||
worker.runProcessLoop();
|
||||
|
||||
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
|
||||
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
|
||||
|
||||
worker.runProcessLoop();
|
||||
|
||||
verify(executorService, atLeastOnce()).submit(argThat(
|
||||
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
|
||||
|
||||
Future<Boolean> firstFuture = worker.startGracefulShutdown();
|
||||
Future<Boolean> secondFuture = worker.startGracefulShutdown();
|
||||
|
||||
assertThat(firstFuture, equalTo(secondFuture));
|
||||
verify(coordinator).startGracefulShutdown(any(Callable.class));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestShutdownNoLeases() throws Exception {
|
||||
|
||||
|
|
@ -830,7 +970,7 @@ public class WorkerTest {
|
|||
verify(executorService, never()).submit(argThat(
|
||||
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
|
||||
|
||||
worker.requestShutdown();
|
||||
worker.createWorkerShutdownCallable().call();
|
||||
worker.runProcessLoop();
|
||||
|
||||
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
|
||||
|
|
@ -909,7 +1049,7 @@ public class WorkerTest {
|
|||
.withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2)))));
|
||||
|
||||
worker.getShardInfoShardConsumerMap().remove(shardInfo2);
|
||||
worker.requestShutdown();
|
||||
worker.createWorkerShutdownCallable().call();
|
||||
leases.remove(1);
|
||||
currentAssignments.remove(1);
|
||||
worker.runProcessLoop();
|
||||
|
|
@ -1194,6 +1334,24 @@ public class WorkerTest {
|
|||
|
||||
}
|
||||
|
||||
private abstract class InjectableWorker extends Worker {
|
||||
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
||||
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
|
||||
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
|
||||
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
|
||||
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
|
||||
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
|
||||
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
|
||||
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
|
||||
postConstruct();
|
||||
}
|
||||
|
||||
abstract void postConstruct();
|
||||
}
|
||||
|
||||
private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) {
|
||||
return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID())
|
||||
.withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
||||
|
||||
import static org.hamcrest.Matchers.both;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.types;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,17 @@
|
|||
/*
|
||||
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
|
|
|||
Loading…
Reference in a new issue