Adding support for timeout for multilang protocol, related to issue 185 (#195)
* Adding timeout to waitForStatusMessage future call. Introducing new config properties timeoutEnabled and timeoutInSeconds. Halting the JVM if timeout is reached. * Adding test cases for halt jvm code. Made the configuration objects for timeout optional. * Addressing code review comments and making appropriate changes.
This commit is contained in:
parent
7d56c4aef1
commit
e8f9ad3f0a
8 changed files with 245 additions and 108 deletions
|
|
@ -15,6 +15,7 @@
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.lang.Validate;
|
import org.apache.commons.lang.Validate;
|
||||||
|
|
@ -213,6 +214,9 @@ public class KinesisClientLibConfiguration {
|
||||||
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
private ShardPrioritization shardPrioritization;
|
private ShardPrioritization shardPrioritization;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private Optional<Integer> timeoutInSeconds = Optional.empty();
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
|
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
|
||||||
|
|
||||||
|
|
@ -1092,7 +1096,7 @@ public class KinesisClientLibConfiguration {
|
||||||
* Sets the size of the thread pool that will be used to renew leases.
|
* Sets the size of the thread pool that will be used to renew leases.
|
||||||
*
|
*
|
||||||
* Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
|
* Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
|
||||||
*
|
*
|
||||||
* @param maxLeaseRenewalThreads
|
* @param maxLeaseRenewalThreads
|
||||||
* the maximum size of the lease renewal thread pool
|
* the maximum size of the lease renewal thread pool
|
||||||
* @throws IllegalArgumentException
|
* @throws IllegalArgumentException
|
||||||
|
|
@ -1106,4 +1110,12 @@ public class KinesisClientLibConfiguration {
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
|
||||||
|
*/
|
||||||
|
public void withTimeoutInSeconds(final int timeoutInSeconds) {
|
||||||
|
this.timeoutInSeconds = Optional.of(timeoutInSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -100,7 +100,7 @@ public class MultiLangDaemonConfig {
|
||||||
|
|
||||||
kinesisClientLibConfig = configurator.getConfiguration(properties);
|
kinesisClientLibConfig = configurator.getConfiguration(properties);
|
||||||
executorService = buildExecutorService(properties);
|
executorService = buildExecutorService(properties);
|
||||||
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
|
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig);
|
||||||
|
|
||||||
LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
|
LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
|
||||||
+ kinesisClientLibConfig.getStreamName() + " with executable " + executableName);
|
+ kinesisClientLibConfig.getStreamName() + " with executable " + executableName);
|
||||||
|
|
|
||||||
|
|
@ -14,11 +14,9 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.multilang;
|
package com.amazonaws.services.kinesis.multilang;
|
||||||
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||||
|
|
@ -29,9 +27,14 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
|
||||||
|
|
||||||
import lombok.extern.apachecommons.CommonsLog;
|
import lombok.extern.apachecommons.CommonsLog;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of the multi language protocol.
|
* An implementation of the multi language protocol.
|
||||||
*/
|
*/
|
||||||
|
|
@ -41,10 +44,11 @@ class MultiLangProtocol {
|
||||||
private MessageReader messageReader;
|
private MessageReader messageReader;
|
||||||
private MessageWriter messageWriter;
|
private MessageWriter messageWriter;
|
||||||
private final InitializationInput initializationInput;
|
private final InitializationInput initializationInput;
|
||||||
|
private KinesisClientLibConfiguration configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
* @param messageReader
|
* @param messageReader
|
||||||
* A message reader.
|
* A message reader.
|
||||||
* @param messageWriter
|
* @param messageWriter
|
||||||
|
|
@ -53,16 +57,17 @@ class MultiLangProtocol {
|
||||||
* information about the shard this processor is starting to process
|
* information about the shard this processor is starting to process
|
||||||
*/
|
*/
|
||||||
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
|
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
|
||||||
InitializationInput initializationInput) {
|
InitializationInput initializationInput, KinesisClientLibConfiguration configuration) {
|
||||||
this.messageReader = messageReader;
|
this.messageReader = messageReader;
|
||||||
this.messageWriter = messageWriter;
|
this.messageWriter = messageWriter;
|
||||||
this.initializationInput = initializationInput;
|
this.initializationInput = initializationInput;
|
||||||
|
this.configuration = configuration;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with
|
* Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with
|
||||||
* a {@link StatusMessage} on its STDOUT.
|
* a {@link StatusMessage} on its STDOUT.
|
||||||
*
|
*
|
||||||
* @return Whether or not this operation succeeded.
|
* @return Whether or not this operation succeeded.
|
||||||
*/
|
*/
|
||||||
boolean initialize() {
|
boolean initialize() {
|
||||||
|
|
@ -77,7 +82,7 @@ class MultiLangProtocol {
|
||||||
/**
|
/**
|
||||||
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
|
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
|
||||||
* with a {@link StatusMessage} on its STDOUT.
|
* with a {@link StatusMessage} on its STDOUT.
|
||||||
*
|
*
|
||||||
* @param processRecordsInput
|
* @param processRecordsInput
|
||||||
* The records, and associated metadata, to process.
|
* The records, and associated metadata, to process.
|
||||||
* @return Whether or not this operation succeeded.
|
* @return Whether or not this operation succeeded.
|
||||||
|
|
@ -90,7 +95,7 @@ class MultiLangProtocol {
|
||||||
/**
|
/**
|
||||||
* Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a
|
* Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a
|
||||||
* {@link StatusMessage} on its STDOUT.
|
* {@link StatusMessage} on its STDOUT.
|
||||||
*
|
*
|
||||||
* @param checkpointer A checkpointer.
|
* @param checkpointer A checkpointer.
|
||||||
* @param reason Why this processor is being shutdown.
|
* @param reason Why this processor is being shutdown.
|
||||||
* @return Whether or not this operation succeeded.
|
* @return Whether or not this operation succeeded.
|
||||||
|
|
@ -119,7 +124,7 @@ class MultiLangProtocol {
|
||||||
* all communications with the child process regarding checkpointing were successful. Note that whether or not the
|
* all communications with the child process regarding checkpointing were successful. Note that whether or not the
|
||||||
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
|
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
|
||||||
* able to successfully communicate the results of its attempts to checkpoint.
|
* able to successfully communicate the results of its attempts to checkpoint.
|
||||||
*
|
*
|
||||||
* @param action
|
* @param action
|
||||||
* What action is being waited on.
|
* What action is being waited on.
|
||||||
* @param checkpointer
|
* @param checkpointer
|
||||||
|
|
@ -150,44 +155,75 @@ class MultiLangProtocol {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for status message and verifies it against the expectation
|
* Waits for status message and verifies it against the expectation
|
||||||
*
|
*
|
||||||
* @param action
|
* @param action
|
||||||
* What action is being waited on.
|
* What action is being waited on.
|
||||||
* @param checkpointer
|
* @param checkpointer
|
||||||
* the original process records request
|
* the original process records request
|
||||||
* @return Whether or not this operation succeeded.
|
* @return Whether or not this operation succeeded.
|
||||||
*/
|
*/
|
||||||
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
|
boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
|
||||||
StatusMessage statusMessage = null;
|
Optional<StatusMessage> statusMessage = Optional.empty();
|
||||||
while (statusMessage == null) {
|
while (!statusMessage.isPresent()) {
|
||||||
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
|
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
|
||||||
try {
|
Optional<Message> message = configuration.getTimeoutInSeconds()
|
||||||
Message message = future.get();
|
.map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action))
|
||||||
// Note that instanceof doubles as a check against a value being null
|
.orElse(futureMethod(future::get, action));
|
||||||
if (message instanceof CheckpointMessage) {
|
|
||||||
boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get();
|
if (!message.isPresent()) {
|
||||||
if (!checkpointWriteSucceeded) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else if (message instanceof StatusMessage) {
|
|
||||||
statusMessage = (StatusMessage) message;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
|
|
||||||
initializationInput.getShardId()));
|
|
||||||
return false;
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
log.error(String.format("Failed to get status message for %s action for shard %s", action,
|
|
||||||
initializationInput.getShardId()), e);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Optional<Boolean> checkpointFailed = message.filter(m -> m instanceof CheckpointMessage )
|
||||||
|
.map(m -> (CheckpointMessage) m)
|
||||||
|
.flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint"))
|
||||||
|
.map(checkpointSuccess -> !checkpointSuccess);
|
||||||
|
|
||||||
|
if (checkpointFailed.orElse(false)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m );
|
||||||
}
|
}
|
||||||
return this.validateStatusMessage(statusMessage, action);
|
return this.validateStatusMessage(statusMessage.get(), action);
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface FutureMethod<T> {
|
||||||
|
T get() throws InterruptedException, TimeoutException, ExecutionException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> Optional<T> futureMethod(FutureMethod<T> fm, String action) {
|
||||||
|
try {
|
||||||
|
return Optional.of(fm.get());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
|
||||||
|
initializationInput.getShardId()), e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
log.error(String.format("Failed to get status message for %s action for shard %s", action,
|
||||||
|
initializationInput.getShardId()), e);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...",
|
||||||
|
action,
|
||||||
|
initializationInput.getShardId()),
|
||||||
|
e);
|
||||||
|
haltJvm(1);
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM
|
||||||
|
* without calling the Shutdown hooks.
|
||||||
|
*
|
||||||
|
* @param exitStatus The exit status with which the JVM is to be halted.
|
||||||
|
*/
|
||||||
|
protected void haltJvm(int exitStatus) {
|
||||||
|
Runtime.getRuntime().halt(exitStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility for confirming that the status message is for the provided action.
|
* Utility for confirming that the status message is for the provided action.
|
||||||
*
|
*
|
||||||
* @param statusMessage The status of the child process.
|
* @param statusMessage The status of the child process.
|
||||||
* @param action The action that was being waited on.
|
* @param action The action that was being waited on.
|
||||||
* @return Whether or not this operation succeeded.
|
* @return Whether or not this operation succeeded.
|
||||||
|
|
@ -205,7 +241,7 @@ class MultiLangProtocol {
|
||||||
* provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then
|
* provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then
|
||||||
* this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing
|
* this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing
|
||||||
* the attempt to write the result of this checkpoint attempt to the child process.
|
* the attempt to write the result of this checkpoint attempt to the child process.
|
||||||
*
|
*
|
||||||
* @param checkpointMessage A checkpoint message.
|
* @param checkpointMessage A checkpoint message.
|
||||||
* @param checkpointer A checkpointer.
|
* @param checkpointer A checkpointer.
|
||||||
* @return Whether or not this operation succeeded.
|
* @return Whether or not this operation succeeded.
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateExcep
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
@ -65,6 +66,8 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
||||||
|
|
||||||
private MultiLangProtocol protocol;
|
private MultiLangProtocol protocol;
|
||||||
|
|
||||||
|
private KinesisClientLibConfiguration configuration;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(InitializationInput initializationInput) {
|
public void initialize(InitializationInput initializationInput) {
|
||||||
try {
|
try {
|
||||||
|
|
@ -87,7 +90,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
||||||
// Submit the error reader for execution
|
// Submit the error reader for execution
|
||||||
stderrReadTask = executorService.submit(readSTDERRTask);
|
stderrReadTask = executorService.submit(readSTDERRTask);
|
||||||
|
|
||||||
protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput);
|
protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput, configuration);
|
||||||
if (!protocol.initialize()) {
|
if (!protocol.initialize()) {
|
||||||
throw new RuntimeException("Failed to initialize child process");
|
throw new RuntimeException("Failed to initialize child process");
|
||||||
}
|
}
|
||||||
|
|
@ -173,9 +176,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
||||||
* An obejct mapper.
|
* An obejct mapper.
|
||||||
*/
|
*/
|
||||||
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
|
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
|
||||||
ObjectMapper objectMapper) {
|
ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) {
|
||||||
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
|
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
|
||||||
new DrainChildSTDERRTask());
|
new DrainChildSTDERRTask(), configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -195,13 +198,16 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
||||||
* Error reader to read from child process's stderr
|
* Error reader to read from child process's stderr
|
||||||
*/
|
*/
|
||||||
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
|
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
|
||||||
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) {
|
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask,
|
||||||
|
KinesisClientLibConfiguration configuration) {
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
this.processBuilder = processBuilder;
|
this.processBuilder = processBuilder;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
this.messageWriter = messageWriter;
|
this.messageWriter = messageWriter;
|
||||||
this.messageReader = messageReader;
|
this.messageReader = messageReader;
|
||||||
this.readSTDERRTask = readSTDERRTask;
|
this.readSTDERRTask = readSTDERRTask;
|
||||||
|
this.configuration = configuration;
|
||||||
|
|
||||||
|
|
||||||
this.state = ProcessState.ACTIVE;
|
this.state = ProcessState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.multilang;
|
||||||
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
@ -39,12 +40,15 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
|
||||||
|
|
||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
private final KinesisClientLibConfiguration configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param command The command that will do processing for this factory's record processors.
|
* @param command The command that will do processing for this factory's record processors.
|
||||||
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
||||||
*/
|
*/
|
||||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) {
|
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService,
|
||||||
this(command, executorService, new ObjectMapper());
|
KinesisClientLibConfiguration configuration) {
|
||||||
|
this(command, executorService, new ObjectMapper(), configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -52,11 +56,13 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
|
||||||
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
||||||
* @param objectMapper An object mapper used to convert messages to json to be written to the child process
|
* @param objectMapper An object mapper used to convert messages to json to be written to the child process
|
||||||
*/
|
*/
|
||||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) {
|
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper,
|
||||||
|
KinesisClientLibConfiguration configuration) {
|
||||||
this.command = command;
|
this.command = command;
|
||||||
this.commandArray = command.split(COMMAND_DELIMETER_REGEX);
|
this.commandArray = command.split(COMMAND_DELIMETER_REGEX);
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
|
this.configuration = configuration;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -65,7 +71,8 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
|
||||||
/*
|
/*
|
||||||
* Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments.
|
* Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments.
|
||||||
*/
|
*/
|
||||||
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper);
|
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper,
|
||||||
|
this.configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
String[] getCommandArray() {
|
String[] getCommandArray() {
|
||||||
|
|
|
||||||
|
|
@ -14,64 +14,78 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.multilang;
|
package com.amazonaws.services.kinesis.multilang;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Matchers.anyString;
|
|
||||||
import static org.mockito.Matchers.argThat;
|
|
||||||
import static org.mockito.Mockito.timeout;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.model.Record;
|
import com.amazonaws.services.kinesis.model.Record;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.Message;
|
import com.amazonaws.services.kinesis.multilang.messages.Message;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class MultiLangProtocolTest {
|
public class MultiLangProtocolTest {
|
||||||
|
|
||||||
private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList();
|
private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList();
|
||||||
|
@Mock
|
||||||
private MultiLangProtocol protocol;
|
private MultiLangProtocol protocol;
|
||||||
|
@Mock
|
||||||
private MessageWriter messageWriter;
|
private MessageWriter messageWriter;
|
||||||
|
@Mock
|
||||||
private MessageReader messageReader;
|
private MessageReader messageReader;
|
||||||
private String shardId;
|
private String shardId;
|
||||||
|
@Mock
|
||||||
private IRecordProcessorCheckpointer checkpointer;
|
private IRecordProcessorCheckpointer checkpointer;
|
||||||
|
@Mock
|
||||||
|
private KinesisClientLibConfiguration configuration;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
this.shardId = "shard-id-123";
|
this.shardId = "shard-id-123";
|
||||||
messageWriter = Mockito.mock(MessageWriter.class);
|
protocol = new MultiLangProtocolForTesting(messageReader, messageWriter,
|
||||||
messageReader = Mockito.mock(MessageReader.class);
|
new InitializationInput().withShardId(shardId), configuration);
|
||||||
protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId));
|
|
||||||
checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
|
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> Future<T> buildFuture(T value) {
|
private <T> Future<T> buildFuture(T value) {
|
||||||
|
|
@ -165,7 +179,10 @@ public class MultiLangProtocolTest {
|
||||||
this.add(new StatusMessage("processRecords"));
|
this.add(new StatusMessage("processRecords"));
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true));
|
|
||||||
|
boolean result = protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer));
|
||||||
|
|
||||||
|
assertThat(result, equalTo(true));
|
||||||
|
|
||||||
verify(checkpointer, timeout(1)).checkpoint();
|
verify(checkpointer, timeout(1)).checkpoint();
|
||||||
verify(checkpointer, timeout(1)).checkpoint("123", 0L);
|
verify(checkpointer, timeout(1)).checkpoint("123", 0L);
|
||||||
|
|
@ -183,4 +200,50 @@ public class MultiLangProtocolTest {
|
||||||
}));
|
}));
|
||||||
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false));
|
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = NullPointerException.class)
|
||||||
|
public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException {
|
||||||
|
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
||||||
|
Future<Message> future = Mockito.mock(Future.class);
|
||||||
|
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future);
|
||||||
|
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5));
|
||||||
|
when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class);
|
||||||
|
protocol = new MultiLangProtocolForTesting(messageReader,
|
||||||
|
messageWriter,
|
||||||
|
new InitializationInput().withShardId(shardId),
|
||||||
|
configuration);
|
||||||
|
|
||||||
|
protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void waitForStatusMessageSuccessTest() {
|
||||||
|
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
|
||||||
|
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class));
|
||||||
|
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5));
|
||||||
|
|
||||||
|
assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MultiLangProtocolForTesting extends MultiLangProtocol {
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*
|
||||||
|
* @param messageReader A message reader.
|
||||||
|
* @param messageWriter A message writer.
|
||||||
|
* @param initializationInput
|
||||||
|
* @param configuration
|
||||||
|
*/
|
||||||
|
MultiLangProtocolForTesting(final MessageReader messageReader,
|
||||||
|
final MessageWriter messageWriter,
|
||||||
|
final InitializationInput initializationInput,
|
||||||
|
final KinesisClientLibConfiguration configuration) {
|
||||||
|
super(messageReader, messageWriter, initializationInput, configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void haltJvm(final int exitStatus) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,16 +14,24 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.multilang;
|
package com.amazonaws.services.kinesis.multilang;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class StreamingRecordProcessorFactoryTest {
|
public class StreamingRecordProcessorFactoryTest {
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private KinesisClientLibConfiguration configuration;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void createProcessorTest() {
|
public void createProcessorTest() {
|
||||||
MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null);
|
MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null, configuration);
|
||||||
IRecordProcessor processor = factory.createProcessor();
|
IRecordProcessor processor = factory.createProcessor();
|
||||||
|
|
||||||
Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class,
|
Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class,
|
||||||
|
|
|
||||||
|
|
@ -14,41 +14,13 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.multilang;
|
package com.amazonaws.services.kinesis.multilang;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Matchers.anyString;
|
|
||||||
import static org.mockito.Matchers.argThat;
|
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.mockito.Mock;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||||
|
|
@ -59,6 +31,35 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
|
||||||
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
|
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class StreamingRecordProcessorTest {
|
public class StreamingRecordProcessorTest {
|
||||||
|
|
@ -108,6 +109,9 @@ public class StreamingRecordProcessorTest {
|
||||||
|
|
||||||
private MultiLangRecordProcessor recordProcessor;
|
private MultiLangRecordProcessor recordProcessor;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private KinesisClientLibConfiguration configuration;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void prepare() throws IOException, InterruptedException, ExecutionException {
|
public void prepare() throws IOException, InterruptedException, ExecutionException {
|
||||||
// Fake command
|
// Fake command
|
||||||
|
|
@ -121,10 +125,11 @@ public class StreamingRecordProcessorTest {
|
||||||
messageWriter = Mockito.mock(MessageWriter.class);
|
messageWriter = Mockito.mock(MessageWriter.class);
|
||||||
messageReader = Mockito.mock(MessageReader.class);
|
messageReader = Mockito.mock(MessageReader.class);
|
||||||
errorReader = Mockito.mock(DrainChildSTDERRTask.class);
|
errorReader = Mockito.mock(DrainChildSTDERRTask.class);
|
||||||
|
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty());
|
||||||
|
|
||||||
recordProcessor =
|
recordProcessor =
|
||||||
new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter,
|
new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter,
|
||||||
messageReader, errorReader) {
|
messageReader, errorReader, configuration) {
|
||||||
|
|
||||||
// Just don't do anything when we exit.
|
// Just don't do anything when we exit.
|
||||||
void exit() {
|
void exit() {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue