From e80201b04726636cb17e512c3aa587fce5454dac Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 19 Jul 2017 12:40:05 -0700 Subject: [PATCH] Adding test cases for halt jvm code. Made the configuration objects for timeout optional. --- .../worker/KinesisClientLibConfiguration.java | 25 +++------ .../kinesis/multilang/MultiLangProtocol.java | 21 ++++++-- .../multilang/MultiLangProtocolForTests.java | 29 +++++++++++ .../multilang/MultiLangProtocolTest.java | 51 ++++++++++++++++++- .../StreamingRecordProcessorFactoryTest.java | 4 +- .../StreamingRecordProcessorTest.java | 5 +- 6 files changed, 109 insertions(+), 26 deletions(-) create mode 100644 src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 36e21288..3e45f5a4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -23,7 +23,9 @@ import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; +import lombok.Getter; /** * Configuration for the Amazon Kinesis Client Library. @@ -203,8 +205,10 @@ public class KinesisClientLibConfiguration { // This is useful for optimizing deployments to large fleets working on a stable stream. private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; - private boolean timeoutEnabled; - private int timeoutInSeconds; + @Getter + private Optional timeoutEnabled = Optional.absent(); + @Getter + private Optional timeoutInSeconds = Optional.absent(); /** * Constructor. @@ -1082,27 +1086,14 @@ public class KinesisClientLibConfiguration { * @param timeoutEnabled Enable or disbale MultiLangProtocol to wait for the records to be processed */ public void withTimeoutEnabled(final boolean timeoutEnabled) { - this.timeoutEnabled = timeoutEnabled; - } - - /** - * @return If timeout is enabled for MultiLangProtocol to wait for records to be processed - */ - public boolean isTimeoutEnabled() { - return timeoutEnabled; + this.timeoutEnabled = Optional.of(timeoutEnabled); } /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ public void withTimeoutInSeconds(final int timeoutInSeconds) { - this.timeoutInSeconds = timeoutInSeconds; + this.timeoutInSeconds = Optional.of(timeoutInSeconds); } - /** - * @return Time for MultiLangProtocol to wait to get response, before throwing an exception. - */ - public int getTimeoutInSeconds() { - return timeoutInSeconds; - } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 86758f2e..cd5382c8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -162,14 +162,17 @@ class MultiLangProtocol { * the original process records request * @return Whether or not this operation succeeded. */ - private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { + boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { StatusMessage statusMessage = null; while (statusMessage == null) { Future future = this.messageReader.getNextMessageFromSTDOUT(); try { Message message; - if (configuration.isTimeoutEnabled()) { - message = future.get(configuration.getTimeoutInSeconds(), TimeUnit.SECONDS); + if (configuration.getTimeoutEnabled().isPresent() && configuration.getTimeoutEnabled().get()) { + if (!configuration.getTimeoutInSeconds().isPresent()) { + throw new IllegalArgumentException("timeoutInSeconds property should be set if timeoutEnabled is true"); + } + message = future.get(configuration.getTimeoutInSeconds().get(), TimeUnit.SECONDS); } else { message = future.get(); } @@ -195,12 +198,22 @@ class MultiLangProtocol { action, initializationInput.getShardId()), e); - Runtime.getRuntime().halt(1); + haltJvm(1); } } return this.validateStatusMessage(statusMessage, action); } + /** + * This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM + * without calling the Shutdown hooks. + * + * @param exitStatus The exit status with which the JVM is to be halted. + */ + protected void haltJvm(int exitStatus) { + Runtime.getRuntime().halt(exitStatus); + } + /** * Utility for confirming that the status message is for the provided action. * diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java new file mode 100644 index 00000000..220b3368 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java @@ -0,0 +1,29 @@ +package com.amazonaws.services.kinesis.multilang; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; + +/** + * + */ +public class MultiLangProtocolForTests extends MultiLangProtocol { + /** + * Constructor. + * + * @param messageReader A message reader. + * @param messageWriter A message writer. + * @param initializationInput + * @param configuration + */ + MultiLangProtocolForTests(final MessageReader messageReader, + final MessageWriter messageWriter, + final InitializationInput initializationInput, + final KinesisClientLibConfiguration configuration) { + super(messageReader, messageWriter, initializationInput, configuration); + } + + @Override + protected void haltJvm(final int exitStatus) { + throw new RuntimeException("Halt called"); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index 38714518..94dad29c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -15,11 +15,17 @@ package com.amazonaws.services.kinesis.multilang; import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,8 +36,11 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.google.common.base.Optional; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -58,7 +67,7 @@ import com.google.common.util.concurrent.SettableFuture; public class MultiLangProtocolTest { private static final List EMPTY_RECORD_LIST = Collections.emptyList(); - private MultiLangProtocol protocol; + private MultiLangProtocolForTests protocol; private MessageWriter messageWriter; private MessageReader messageReader; private String shardId; @@ -73,9 +82,11 @@ public class MultiLangProtocolTest { messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); configuration = Mockito.mock(KinesisClientLibConfiguration.class); - protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId), + protocol = new MultiLangProtocolForTests(messageReader, messageWriter, new InitializationInput().withShardId(shardId), configuration); checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); + + when(configuration.getTimeoutEnabled()).thenReturn(Optional.absent()); } private Future buildFuture(T value) { @@ -187,4 +198,40 @@ public class MultiLangProtocolTest { })); assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false)); } + + @Test(expected = RuntimeException.class) + public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + Future future = Mockito.mock(Future.class); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future); + when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true)); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); + when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class); + protocol = new MultiLangProtocolForTests(messageReader, + messageWriter, + new InitializationInput().withShardId(shardId), + configuration); + + protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)); + } + + @Test(expected = IllegalArgumentException.class) + public void waitForStatusMessageTimeoutErrorTest() { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); + when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true)); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.absent()); + + protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)); + } + + @Test + public void waitForStatusMessageSuccessTest() { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); + when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true)); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); + + assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST))); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java index 14093291..aa6aceea 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java @@ -14,14 +14,16 @@ */ package com.amazonaws.services.kinesis.multilang; -import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.junit.Assert; import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class StreamingRecordProcessorFactoryTest { @Mock diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 810f4ef0..0d09198a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.Future; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import com.google.common.base.Optional; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -125,7 +126,7 @@ public class StreamingRecordProcessorTest { messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); errorReader = Mockito.mock(DrainChildSTDERRTask.class); - when(configuration.isTimeoutEnabled()).thenReturn(false); + when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(false)); recordProcessor = new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter, @@ -171,7 +172,7 @@ public class StreamingRecordProcessorTest { */ when(messageFuture.get()).thenAnswer(answer); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture); - when(configuration.isTimeoutEnabled()).thenReturn(false); + when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(false)); List testRecords = new ArrayList();