Adding timeout to waitForStatusMessage future call. Introducing new config properties timeoutEnabled and timeoutInSeconds. Halting the JVM if timeout is reached.
This commit is contained in:
parent
4c839a9d43
commit
644a55bccb
8 changed files with 89 additions and 14 deletions
|
|
@ -203,6 +203,8 @@ public class KinesisClientLibConfiguration {
|
|||
// This is useful for optimizing deployments to large fleets working on a stable stream.
|
||||
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
private ShardPrioritization shardPrioritization;
|
||||
private boolean timeoutEnabled;
|
||||
private int timeoutInSeconds;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -1075,4 +1077,32 @@ public class KinesisClientLibConfiguration {
|
|||
this.shardPrioritization = shardPrioritization;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param timeoutEnabled Enable or disbale MultiLangProtocol to wait for the records to be processed
|
||||
*/
|
||||
public void withTimeoutEnabled(final boolean timeoutEnabled) {
|
||||
this.timeoutEnabled = timeoutEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return If timeout is enabled for MultiLangProtocol to wait for records to be processed
|
||||
*/
|
||||
public boolean isTimeoutEnabled() {
|
||||
return timeoutEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
|
||||
*/
|
||||
public void withTimeoutInSeconds(final int timeoutInSeconds) {
|
||||
this.timeoutInSeconds = timeoutInSeconds;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Time for MultiLangProtocol to wait to get response, before throwing an exception.
|
||||
*/
|
||||
public int getTimeoutInSeconds() {
|
||||
return timeoutInSeconds;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ public class MultiLangDaemonConfig {
|
|||
|
||||
kinesisClientLibConfig = configurator.getConfiguration(properties);
|
||||
executorService = buildExecutorService(properties);
|
||||
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
|
||||
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig);
|
||||
|
||||
LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
|
||||
+ kinesisClientLibConfig.getStreamName() + " with executable " + executableName);
|
||||
|
|
|
|||
|
|
@ -16,9 +16,12 @@ package com.amazonaws.services.kinesis.multilang;
|
|||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
|
||||
|
|
@ -41,6 +44,7 @@ class MultiLangProtocol {
|
|||
private MessageReader messageReader;
|
||||
private MessageWriter messageWriter;
|
||||
private final InitializationInput initializationInput;
|
||||
private KinesisClientLibConfiguration configuration;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -53,10 +57,11 @@ class MultiLangProtocol {
|
|||
* information about the shard this processor is starting to process
|
||||
*/
|
||||
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
|
||||
InitializationInput initializationInput) {
|
||||
InitializationInput initializationInput, KinesisClientLibConfiguration configuration) {
|
||||
this.messageReader = messageReader;
|
||||
this.messageWriter = messageWriter;
|
||||
this.initializationInput = initializationInput;
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -162,7 +167,12 @@ class MultiLangProtocol {
|
|||
while (statusMessage == null) {
|
||||
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
|
||||
try {
|
||||
Message message = future.get();
|
||||
Message message;
|
||||
if (configuration.isTimeoutEnabled()) {
|
||||
message = future.get(configuration.getTimeoutInSeconds(), TimeUnit.SECONDS);
|
||||
} else {
|
||||
message = future.get();
|
||||
}
|
||||
// Note that instanceof doubles as a check against a value being null
|
||||
if (message instanceof CheckpointMessage) {
|
||||
boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get();
|
||||
|
|
@ -180,6 +190,12 @@ class MultiLangProtocol {
|
|||
log.error(String.format("Failed to get status message for %s action for shard %s", action,
|
||||
initializationInput.getShardId()), e);
|
||||
return false;
|
||||
} catch (TimeoutException e) {
|
||||
log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...",
|
||||
action,
|
||||
initializationInput.getShardId()),
|
||||
e);
|
||||
Runtime.getRuntime().halt(1);
|
||||
}
|
||||
}
|
||||
return this.validateStatusMessage(statusMessage, action);
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateExcep
|
|||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -65,6 +66,8 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
|||
|
||||
private MultiLangProtocol protocol;
|
||||
|
||||
private KinesisClientLibConfiguration configuration;
|
||||
|
||||
@Override
|
||||
public void initialize(InitializationInput initializationInput) {
|
||||
try {
|
||||
|
|
@ -87,7 +90,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
|||
// Submit the error reader for execution
|
||||
stderrReadTask = executorService.submit(readSTDERRTask);
|
||||
|
||||
protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput);
|
||||
protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput, configuration);
|
||||
if (!protocol.initialize()) {
|
||||
throw new RuntimeException("Failed to initialize child process");
|
||||
}
|
||||
|
|
@ -173,9 +176,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
|||
* An obejct mapper.
|
||||
*/
|
||||
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
|
||||
ObjectMapper objectMapper) {
|
||||
ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) {
|
||||
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
|
||||
new DrainChildSTDERRTask());
|
||||
new DrainChildSTDERRTask(), configuration);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -195,13 +198,16 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
|||
* Error reader to read from child process's stderr
|
||||
*/
|
||||
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
|
||||
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) {
|
||||
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask,
|
||||
KinesisClientLibConfiguration configuration) {
|
||||
this.executorService = executorService;
|
||||
this.processBuilder = processBuilder;
|
||||
this.objectMapper = objectMapper;
|
||||
this.messageWriter = messageWriter;
|
||||
this.messageReader = messageReader;
|
||||
this.readSTDERRTask = readSTDERRTask;
|
||||
this.configuration = configuration;
|
||||
|
||||
|
||||
this.state = ProcessState.ACTIVE;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.multilang;
|
|||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -39,12 +40,15 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
|
|||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
private final KinesisClientLibConfiguration configuration;
|
||||
|
||||
/**
|
||||
* @param command The command that will do processing for this factory's record processors.
|
||||
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
||||
*/
|
||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) {
|
||||
this(command, executorService, new ObjectMapper());
|
||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService,
|
||||
KinesisClientLibConfiguration configuration) {
|
||||
this(command, executorService, new ObjectMapper(), configuration);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -52,11 +56,13 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
|
|||
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
||||
* @param objectMapper An object mapper used to convert messages to json to be written to the child process
|
||||
*/
|
||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) {
|
||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper,
|
||||
KinesisClientLibConfiguration configuration) {
|
||||
this.command = command;
|
||||
this.commandArray = command.split(COMMAND_DELIMETER_REGEX);
|
||||
this.executorService = executorService;
|
||||
this.objectMapper = objectMapper;
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -65,7 +71,8 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory
|
|||
/*
|
||||
* Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments.
|
||||
*/
|
||||
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper);
|
||||
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper,
|
||||
this.configuration);
|
||||
}
|
||||
|
||||
String[] getCommandArray() {
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import java.util.List;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
|
@ -62,6 +63,7 @@ public class MultiLangProtocolTest {
|
|||
private MessageReader messageReader;
|
||||
private String shardId;
|
||||
private IRecordProcessorCheckpointer checkpointer;
|
||||
private KinesisClientLibConfiguration configuration;
|
||||
|
||||
|
||||
|
||||
|
|
@ -70,7 +72,9 @@ public class MultiLangProtocolTest {
|
|||
this.shardId = "shard-id-123";
|
||||
messageWriter = Mockito.mock(MessageWriter.class);
|
||||
messageReader = Mockito.mock(MessageReader.class);
|
||||
protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId));
|
||||
configuration = Mockito.mock(KinesisClientLibConfiguration.class);
|
||||
protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId),
|
||||
configuration);
|
||||
checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,16 +14,22 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||
import org.mockito.Mock;
|
||||
|
||||
public class StreamingRecordProcessorFactoryTest {
|
||||
|
||||
@Mock
|
||||
private KinesisClientLibConfiguration configuration;
|
||||
|
||||
@Test
|
||||
public void createProcessorTest() {
|
||||
MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null);
|
||||
MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null, configuration);
|
||||
IRecordProcessor processor = factory.createProcessor();
|
||||
|
||||
Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class,
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
|
@ -108,6 +109,9 @@ public class StreamingRecordProcessorTest {
|
|||
|
||||
private MultiLangRecordProcessor recordProcessor;
|
||||
|
||||
@Mock
|
||||
private KinesisClientLibConfiguration configuration;
|
||||
|
||||
@Before
|
||||
public void prepare() throws IOException, InterruptedException, ExecutionException {
|
||||
// Fake command
|
||||
|
|
@ -121,10 +125,11 @@ public class StreamingRecordProcessorTest {
|
|||
messageWriter = Mockito.mock(MessageWriter.class);
|
||||
messageReader = Mockito.mock(MessageReader.class);
|
||||
errorReader = Mockito.mock(DrainChildSTDERRTask.class);
|
||||
when(configuration.isTimeoutEnabled()).thenReturn(false);
|
||||
|
||||
recordProcessor =
|
||||
new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter,
|
||||
messageReader, errorReader) {
|
||||
messageReader, errorReader, configuration) {
|
||||
|
||||
// Just don't do anything when we exit.
|
||||
void exit() {
|
||||
|
|
@ -166,6 +171,7 @@ public class StreamingRecordProcessorTest {
|
|||
*/
|
||||
when(messageFuture.get()).thenAnswer(answer);
|
||||
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture);
|
||||
when(configuration.isTimeoutEnabled()).thenReturn(false);
|
||||
|
||||
List<Record> testRecords = new ArrayList<Record>();
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue