Addressing code review comments and making appropriate changes.

This commit is contained in:
Sahil Palvia 2017-07-31 11:14:26 -07:00
parent 6f7c04dbd7
commit 6c0f4c1d77
5 changed files with 72 additions and 105 deletions

View file

@ -214,8 +214,6 @@ public class KinesisClientLibConfiguration {
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
@Getter
private Optional<Boolean> timeoutEnabled = Optional.absent();
@Getter @Getter
private Optional<Integer> timeoutInSeconds = Optional.absent(); private Optional<Integer> timeoutInSeconds = Optional.absent();
@ -1113,13 +1111,6 @@ public class KinesisClientLibConfiguration {
return this; return this;
} }
/**
* @param timeoutEnabled Enable or disbale MultiLangProtocol to wait for the records to be processed
*/
public void withTimeoutEnabled(final boolean timeoutEnabled) {
this.timeoutEnabled = Optional.of(timeoutEnabled);
}
/** /**
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
*/ */

View file

@ -168,10 +168,7 @@ class MultiLangProtocol {
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT(); Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
try { try {
Message message; Message message;
if (configuration.getTimeoutEnabled().isPresent() && configuration.getTimeoutEnabled().get()) { if (configuration.getTimeoutInSeconds().isPresent() && configuration.getTimeoutInSeconds().get() > 0) {
if (!configuration.getTimeoutInSeconds().isPresent()) {
throw new IllegalArgumentException("timeoutInSeconds property should be set if timeoutEnabled is true");
}
message = future.get(configuration.getTimeoutInSeconds().get(), TimeUnit.SECONDS); message = future.get(configuration.getTimeoutInSeconds().get(), TimeUnit.SECONDS);
} else { } else {
message = future.get(); message = future.get();

View file

@ -1,29 +0,0 @@
package com.amazonaws.services.kinesis.multilang;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
/**
*
*/
public class MultiLangProtocolForTests extends MultiLangProtocol {
/**
* Constructor.
*
* @param messageReader A message reader.
* @param messageWriter A message writer.
* @param initializationInput
* @param configuration
*/
MultiLangProtocolForTests(final MessageReader messageReader,
final MessageWriter messageWriter,
final InitializationInput initializationInput,
final KinesisClientLibConfiguration configuration) {
super(messageReader, messageWriter, initializationInput, configuration);
}
@Override
protected void haltJvm(final int exitStatus) {
throw new RuntimeException("Halt called");
}
}

View file

@ -14,21 +14,30 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import static org.hamcrest.CoreMatchers.equalTo; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import static org.junit.Assert.assertFalse; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import static org.junit.Assert.assertThat; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import static org.junit.Assert.assertTrue; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import static org.mockito.Matchers.any; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import static org.mockito.Matchers.anyInt; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import static org.mockito.Matchers.anyLong; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import static org.mockito.Matchers.anyString; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import static org.mockito.Matchers.argThat; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import static org.mockito.Matchers.eq; import com.amazonaws.services.kinesis.model.Record;
import static org.mockito.Mockito.doNothing; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import static org.mockito.Mockito.spy; import com.amazonaws.services.kinesis.multilang.messages.Message;
import static org.mockito.Mockito.timeout; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import static org.mockito.Mockito.verify; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import static org.mockito.Mockito.when; import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -39,39 +48,33 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import static org.hamcrest.CoreMatchers.equalTo;
import com.google.common.base.Optional; import static org.junit.Assert.assertThat;
import org.junit.Before; import static org.junit.Assert.assertTrue;
import org.junit.Test; import static org.mockito.Matchers.any;
import org.mockito.Mockito; import static org.mockito.Matchers.anyInt;
import org.mockito.invocation.InvocationOnMock; import static org.mockito.Matchers.anyLong;
import org.mockito.stubbing.Answer; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import static org.mockito.Matchers.eq;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import static org.mockito.Mockito.timeout;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import static org.mockito.Mockito.verify;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import static org.mockito.Mockito.when;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.google.common.util.concurrent.SettableFuture;
@RunWith(MockitoJUnitRunner.class)
public class MultiLangProtocolTest { public class MultiLangProtocolTest {
private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList(); private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList();
private MultiLangProtocolForTests protocol; @Mock
private MultiLangProtocol protocol;
@Mock
private MessageWriter messageWriter; private MessageWriter messageWriter;
@Mock
private MessageReader messageReader; private MessageReader messageReader;
private String shardId; private String shardId;
@Mock
private IRecordProcessorCheckpointer checkpointer; private IRecordProcessorCheckpointer checkpointer;
@Mock
private KinesisClientLibConfiguration configuration; private KinesisClientLibConfiguration configuration;
@ -79,14 +82,10 @@ public class MultiLangProtocolTest {
@Before @Before
public void setup() { public void setup() {
this.shardId = "shard-id-123"; this.shardId = "shard-id-123";
messageWriter = Mockito.mock(MessageWriter.class); protocol = new MultiLangProtocolForTesting(messageReader, messageWriter,
messageReader = Mockito.mock(MessageReader.class); new InitializationInput().withShardId(shardId), configuration);
configuration = Mockito.mock(KinesisClientLibConfiguration.class);
protocol = new MultiLangProtocolForTests(messageReader, messageWriter, new InitializationInput().withShardId(shardId),
configuration);
checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
when(configuration.getTimeoutEnabled()).thenReturn(Optional.<Boolean>absent()); when(configuration.getTimeoutInSeconds()).thenReturn(Optional.absent());
} }
private <T> Future<T> buildFuture(T value) { private <T> Future<T> buildFuture(T value) {
@ -199,15 +198,14 @@ public class MultiLangProtocolTest {
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false)); assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false));
} }
@Test(expected = RuntimeException.class) @Test(expected = NullPointerException.class)
public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException { public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
Future<Message> future = Mockito.mock(Future.class); Future<Message> future = Mockito.mock(Future.class);
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future);
when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true));
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5));
when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class); when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class);
protocol = new MultiLangProtocolForTests(messageReader, protocol = new MultiLangProtocolForTesting(messageReader,
messageWriter, messageWriter,
new InitializationInput().withShardId(shardId), new InitializationInput().withShardId(shardId),
configuration); configuration);
@ -215,23 +213,34 @@ public class MultiLangProtocolTest {
protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)); protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST));
} }
@Test(expected = IllegalArgumentException.class)
public void waitForStatusMessageTimeoutErrorTest() {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class));
when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true));
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.<Integer>absent());
protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST));
}
@Test @Test
public void waitForStatusMessageSuccessTest() { public void waitForStatusMessageSuccessTest() {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class));
when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(true));
when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5));
assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST))); assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)));
} }
private class MultiLangProtocolForTesting extends MultiLangProtocol {
/**
* Constructor.
*
* @param messageReader A message reader.
* @param messageWriter A message writer.
* @param initializationInput
* @param configuration
*/
MultiLangProtocolForTesting(final MessageReader messageReader,
final MessageWriter messageWriter,
final InitializationInput initializationInput,
final KinesisClientLibConfiguration configuration) {
super(messageReader, messageWriter, initializationInput, configuration);
}
@Override
protected void haltJvm(final int exitStatus) {
throw new NullPointerException();
}
}
} }

View file

@ -126,7 +126,7 @@ public class StreamingRecordProcessorTest {
messageWriter = Mockito.mock(MessageWriter.class); messageWriter = Mockito.mock(MessageWriter.class);
messageReader = Mockito.mock(MessageReader.class); messageReader = Mockito.mock(MessageReader.class);
errorReader = Mockito.mock(DrainChildSTDERRTask.class); errorReader = Mockito.mock(DrainChildSTDERRTask.class);
when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(false)); when(configuration.getTimeoutInSeconds()).thenReturn(Optional.absent());
recordProcessor = recordProcessor =
new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter, new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter,
@ -172,7 +172,6 @@ public class StreamingRecordProcessorTest {
*/ */
when(messageFuture.get()).thenAnswer(answer); when(messageFuture.get()).thenAnswer(answer);
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture);
when(configuration.getTimeoutEnabled()).thenReturn(Optional.of(false));
List<Record> testRecords = new ArrayList<Record>(); List<Record> testRecords = new ArrayList<Record>();