diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index b708a30a..36e21288 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -203,6 +203,8 @@ public class KinesisClientLibConfiguration { // This is useful for optimizing deployments to large fleets working on a stable stream. private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + private boolean timeoutEnabled; + private int timeoutInSeconds; /** * Constructor. @@ -1075,4 +1077,32 @@ public class KinesisClientLibConfiguration { this.shardPrioritization = shardPrioritization; return this; } + + /** + * @param timeoutEnabled Enable or disbale MultiLangProtocol to wait for the records to be processed + */ + public void withTimeoutEnabled(final boolean timeoutEnabled) { + this.timeoutEnabled = timeoutEnabled; + } + + /** + * @return If timeout is enabled for MultiLangProtocol to wait for records to be processed + */ + public boolean isTimeoutEnabled() { + return timeoutEnabled; + } + + /** + * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for + */ + public void withTimeoutInSeconds(final int timeoutInSeconds) { + this.timeoutInSeconds = timeoutInSeconds; + } + + /** + * @return Time for MultiLangProtocol to wait to get response, before throwing an exception. + */ + public int getTimeoutInSeconds() { + return timeoutInSeconds; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java index f191eedc..01b3a27f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java @@ -100,7 +100,7 @@ public class MultiLangDaemonConfig { kinesisClientLibConfig = configurator.getConfiguration(properties); executorService = buildExecutorService(properties); - recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService); + recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig); LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream " + kinesisClientLibConfig.getStreamName() + " with executable " + executableName); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 67a97770..86758f2e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -16,9 +16,12 @@ package com.amazonaws.services.kinesis.multilang; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; @@ -41,6 +44,7 @@ class MultiLangProtocol { private MessageReader messageReader; private MessageWriter messageWriter; private final InitializationInput initializationInput; + private KinesisClientLibConfiguration configuration; /** * Constructor. @@ -53,10 +57,11 @@ class MultiLangProtocol { * information about the shard this processor is starting to process */ MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, - InitializationInput initializationInput) { + InitializationInput initializationInput, KinesisClientLibConfiguration configuration) { this.messageReader = messageReader; this.messageWriter = messageWriter; this.initializationInput = initializationInput; + this.configuration = configuration; } /** @@ -162,7 +167,12 @@ class MultiLangProtocol { while (statusMessage == null) { Future future = this.messageReader.getNextMessageFromSTDOUT(); try { - Message message = future.get(); + Message message; + if (configuration.isTimeoutEnabled()) { + message = future.get(configuration.getTimeoutInSeconds(), TimeUnit.SECONDS); + } else { + message = future.get(); + } // Note that instanceof doubles as a check against a value being null if (message instanceof CheckpointMessage) { boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get(); @@ -180,6 +190,12 @@ class MultiLangProtocol { log.error(String.format("Failed to get status message for %s action for shard %s", action, initializationInput.getShardId()), e); return false; + } catch (TimeoutException e) { + log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...", + action, + initializationInput.getShardId()), + e); + Runtime.getRuntime().halt(1); } } return this.validateStatusMessage(statusMessage, action); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 2532771b..bbcf957b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -24,6 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateExcep import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,8 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti private MultiLangProtocol protocol; + private KinesisClientLibConfiguration configuration; + @Override public void initialize(InitializationInput initializationInput) { try { @@ -87,7 +90,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti // Submit the error reader for execution stderrReadTask = executorService.submit(readSTDERRTask); - protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput); + protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput, configuration); if (!protocol.initialize()) { throw new RuntimeException("Failed to initialize child process"); } @@ -173,9 +176,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti * An obejct mapper. */ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) { this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), - new DrainChildSTDERRTask()); + new DrainChildSTDERRTask(), configuration); } /** @@ -195,13 +198,16 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti * Error reader to read from child process's stderr */ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, - MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) { + MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask, + KinesisClientLibConfiguration configuration) { this.executorService = executorService; this.processBuilder = processBuilder; this.objectMapper = objectMapper; this.messageWriter = messageWriter; this.messageReader = messageReader; this.readSTDERRTask = readSTDERRTask; + this.configuration = configuration; + this.state = ProcessState.ACTIVE; } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java index e55217a6..e596abf2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.multilang; import java.util.concurrent.ExecutorService; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,12 +40,15 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory private final ExecutorService executorService; + private final KinesisClientLibConfiguration configuration; + /** * @param command The command that will do processing for this factory's record processors. * @param executorService An executor service to use while processing inputs and outputs of the child process. */ - public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) { - this(command, executorService, new ObjectMapper()); + public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, + KinesisClientLibConfiguration configuration) { + this(command, executorService, new ObjectMapper(), configuration); } /** @@ -52,11 +56,13 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory * @param executorService An executor service to use while processing inputs and outputs of the child process. * @param objectMapper An object mapper used to convert messages to json to be written to the child process */ - public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) { + public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper, + KinesisClientLibConfiguration configuration) { this.command = command; this.commandArray = command.split(COMMAND_DELIMETER_REGEX); this.executorService = executorService; this.objectMapper = objectMapper; + this.configuration = configuration; } @Override @@ -65,7 +71,8 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory /* * Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments. */ - return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper); + return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper, + this.configuration); } String[] getCommandArray() { diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index e1827799..38714518 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -62,6 +63,7 @@ public class MultiLangProtocolTest { private MessageReader messageReader; private String shardId; private IRecordProcessorCheckpointer checkpointer; + private KinesisClientLibConfiguration configuration; @@ -70,7 +72,9 @@ public class MultiLangProtocolTest { this.shardId = "shard-id-123"; messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); - protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId)); + configuration = Mockito.mock(KinesisClientLibConfiguration.class); + protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId), + configuration); checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java index a8f5885b..14093291 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java @@ -14,16 +14,22 @@ */ package com.amazonaws.services.kinesis.multilang; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.junit.Assert; import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import org.mockito.Mock; public class StreamingRecordProcessorFactoryTest { + @Mock + private KinesisClientLibConfiguration configuration; + @Test public void createProcessorTest() { - MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null); + MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null, configuration); IRecordProcessor processor = factory.createProcessor(); Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class, diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 2c02b5e9..810f4ef0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import org.junit.Assert; import org.junit.Before; @@ -108,6 +109,9 @@ public class StreamingRecordProcessorTest { private MultiLangRecordProcessor recordProcessor; + @Mock + private KinesisClientLibConfiguration configuration; + @Before public void prepare() throws IOException, InterruptedException, ExecutionException { // Fake command @@ -121,10 +125,11 @@ public class StreamingRecordProcessorTest { messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); errorReader = Mockito.mock(DrainChildSTDERRTask.class); + when(configuration.isTimeoutEnabled()).thenReturn(false); recordProcessor = new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter, - messageReader, errorReader) { + messageReader, errorReader, configuration) { // Just don't do anything when we exit. void exit() { @@ -166,6 +171,7 @@ public class StreamingRecordProcessorTest { */ when(messageFuture.get()).thenAnswer(answer); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture); + when(configuration.isTimeoutEnabled()).thenReturn(false); List testRecords = new ArrayList();