From 6c0f4c1d774d9fb4c4e3c9a11b61d143ce22b16f Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 31 Jul 2017 11:14:26 -0700 Subject: [PATCH] Addressing code review comments and making appropriate changes. --- .../worker/KinesisClientLibConfiguration.java | 9 -- .../kinesis/multilang/MultiLangProtocol.java | 5 +- .../multilang/MultiLangProtocolForTests.java | 29 ---- .../multilang/MultiLangProtocolTest.java | 131 ++++++++++-------- .../StreamingRecordProcessorTest.java | 3 +- 5 files changed, 72 insertions(+), 105 deletions(-) delete mode 100644 src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 3b8a0965..8cc105e1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -214,8 +214,6 @@ public class KinesisClientLibConfiguration { private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; - @Getter - private Optional timeoutEnabled = Optional.absent(); @Getter private Optional timeoutInSeconds = Optional.absent(); @@ -1113,13 +1111,6 @@ public class KinesisClientLibConfiguration { return this; } - /** - * @param timeoutEnabled Enable or disbale MultiLangProtocol to wait for the records to be processed - */ - public void withTimeoutEnabled(final boolean timeoutEnabled) { - this.timeoutEnabled = Optional.of(timeoutEnabled); - } - /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index cd5382c8..99338968 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -168,10 +168,7 @@ class MultiLangProtocol { Future future = this.messageReader.getNextMessageFromSTDOUT(); try { Message message; - if (configuration.getTimeoutEnabled().isPresent() && configuration.getTimeoutEnabled().get()) { - if (!configuration.getTimeoutInSeconds().isPresent()) { - throw new IllegalArgumentException("timeoutInSeconds property should be set if timeoutEnabled is true"); - } + if (configuration.getTimeoutInSeconds().isPresent() && configuration.getTimeoutInSeconds().get() > 0) { message = future.get(configuration.getTimeoutInSeconds().get(), TimeUnit.SECONDS); } else { message = future.get(); diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java deleted file mode 100644 index 220b3368..00000000 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolForTests.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.amazonaws.services.kinesis.multilang; - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; - -/** - * - */ -public class MultiLangProtocolForTests extends MultiLangProtocol { - /** - * Constructor. - * - * @param messageReader A message reader. - * @param messageWriter A message writer. - * @param initializationInput - * @param configuration - */ - MultiLangProtocolForTests(final MessageReader messageReader, - final MessageWriter messageWriter, - final InitializationInput initializationInput, - final KinesisClientLibConfiguration configuration) { - super(messageReader, messageWriter, initializationInput, configuration); - } - - @Override - protected void haltJvm(final int exitStatus) { - throw new RuntimeException("Halt called"); - } -} diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index 94dad29c..4b74e728 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -14,21 +14,30 @@ */ package com.amazonaws.services.kinesis.multilang; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; +import com.amazonaws.services.kinesis.multilang.messages.Message; +import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; +import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.SettableFuture; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Collections; @@ -39,39 +48,33 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.google.common.base.Optional; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; - -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; - -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; -import com.amazonaws.services.kinesis.multilang.messages.Message; -import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; -import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; -import com.google.common.util.concurrent.SettableFuture; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@RunWith(MockitoJUnitRunner.class) public class MultiLangProtocolTest { private static final List EMPTY_RECORD_LIST = Collections.emptyList(); - private MultiLangProtocolForTests protocol; + @Mock + private MultiLangProtocol protocol; + @Mock private MessageWriter messageWriter; + @Mock private MessageReader messageReader; private String shardId; + @Mock private IRecordProcessorCheckpointer checkpointer; + @Mock private KinesisClientLibConfiguration configuration; @@ -79,14 +82,10 @@ public class MultiLangProtocolTest { @Before public void setup() { this.shardId = "shard-id-123"; - messageWriter = Mockito.mock(MessageWriter.class); - messageReader = Mockito.mock(MessageReader.class); - configuration = Mockito.mock(KinesisClientLibConfiguration.class); - protocol = new MultiLangProtocolForTests(messageReader, messageWriter, new InitializationInput().withShardId(shardId), - configuration); - checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); + protocol = new MultiLangProtocolForTesting(messageReader, messageWriter, + new InitializationInput().withShardId(shardId), configuration); - when(configuration.getTimeoutEnabled()).thenReturn(Optional.absent()); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.absent()); } private Future buildFuture(T value) { @@ -199,15 +198,14 @@ public class MultiLangProtocolTest { assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false)); } - @Test(expected = RuntimeException.class) + @Test(expected = NullPointerException.class) public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException { when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); Future future = Mockito.mock(Future.class); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future); - when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true)); when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class); - protocol = new MultiLangProtocolForTests(messageReader, + protocol = new MultiLangProtocolForTesting(messageReader, messageWriter, new InitializationInput().withShardId(shardId), configuration); @@ -215,23 +213,34 @@ public class MultiLangProtocolTest { protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)); } - @Test(expected = IllegalArgumentException.class) - public void waitForStatusMessageTimeoutErrorTest() { - when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); - when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); - when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true)); - when(configuration.getTimeoutInSeconds()).thenReturn(Optional.absent()); - - protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)); - } - @Test public void waitForStatusMessageSuccessTest() { when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); - when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true)); when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST))); } + + private class MultiLangProtocolForTesting extends MultiLangProtocol { + /** + * Constructor. + * + * @param messageReader A message reader. + * @param messageWriter A message writer. + * @param initializationInput + * @param configuration + */ + MultiLangProtocolForTesting(final MessageReader messageReader, + final MessageWriter messageWriter, + final InitializationInput initializationInput, + final KinesisClientLibConfiguration configuration) { + super(messageReader, messageWriter, initializationInput, configuration); + } + + @Override + protected void haltJvm(final int exitStatus) { + throw new NullPointerException(); + } + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 0d09198a..6cb17863 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -126,7 +126,7 @@ public class StreamingRecordProcessorTest { messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); errorReader = Mockito.mock(DrainChildSTDERRTask.class); - when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(false)); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.absent()); recordProcessor = new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter, @@ -172,7 +172,6 @@ public class StreamingRecordProcessorTest { */ when(messageFuture.get()).thenAnswer(answer); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture); - when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(false)); List testRecords = new ArrayList();