Some more work on the Python stuff

This commit is contained in:
Pfifer, Justin 2016-09-27 09:01:30 -07:00
parent c777ea963a
commit 4b7f8aa3be
4 changed files with 153 additions and 35 deletions

View file

@ -16,17 +16,22 @@ package com.amazonaws.services.kinesis.multilang;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class captures the configuration needed to run the MultiLangDaemon.
@ -133,10 +138,29 @@ public class MultiLangDaemonConfig {
private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
Properties properties = new Properties();
try (InputStream propertiesStream = new FileInputStream(new File(propertiesFileName))) {
properties.load(propertiesStream);
InputStream propertyStream = null;
try {
propertyStream = classLoader.getResourceAsStream(propertiesFileName);
if (propertyStream == null) {
File propertyFile = new File(propertiesFileName);
if (propertyFile.exists()) {
propertyStream = new FileInputStream(propertyFile);
}
}
if (propertyStream == null) {
throw new FileNotFoundException(
"Unable to find property file in classpath, or file system: '" + propertiesFileName + "'");
}
properties.load(propertyStream);
return properties;
} finally {
if (propertyStream != null) {
propertyStream.close();
}
}
}
private static boolean validateProperties(Properties properties) {
@ -149,13 +173,16 @@ public class MultiLangDaemonConfig {
private static ExecutorService buildExecutorService(Properties properties) {
int maxActiveThreads = getMaxActiveThreads(properties);
ThreadFactoryBuilder builder = new ThreadFactoryBuilder().setNameFormat("multi-lang-daemon-%04d");
LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
if (maxActiveThreads <= 0) {
LOG.info("Using a cached thread pool.");
return Executors.newCachedThreadPool();
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
builder.build());
} else {
LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
return Executors.newFixedThreadPool(maxActiveThreads);
return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), builder.build());
}
}

View file

@ -1,7 +1,93 @@
package com.amazonaws.services.kinesis.multilang;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
public class Matchers {
public static class InitializationInputMatcher
public static Matcher<InitializationInput> withInit(InitializationInput initializationInput) {
return new InitializationInputMatcher(initializationInput);
}
public static class InitializationInputMatcher extends TypeSafeDiagnosingMatcher<InitializationInput> {
private final Matcher<String> shardIdMatcher;
private final Matcher<ExtendedSequenceNumber> sequenceNumberMatcher;
public InitializationInputMatcher(InitializationInput input) {
shardIdMatcher = equalTo(input.getShardId());
sequenceNumberMatcher = withSequence(input.getExtendedSequenceNumber());
}
@Override
protected boolean matchesSafely(final InitializationInput item, Description mismatchDescription) {
boolean matches = true;
if (!shardIdMatcher.matches(item.getShardId())) {
matches = false;
shardIdMatcher.describeMismatch(item.getShardId(), mismatchDescription);
}
if (!sequenceNumberMatcher.matches(item.getExtendedSequenceNumber())) {
matches = false;
sequenceNumberMatcher.describeMismatch(item, mismatchDescription);
}
return matches;
}
@Override
public void describeTo(Description description) {
description.appendText("An InitializationInput matching: { shardId: ").appendDescriptionOf(shardIdMatcher)
.appendText(", sequenceNumber: ").appendDescriptionOf(sequenceNumberMatcher).appendText(" }");
}
}
public static Matcher<ExtendedSequenceNumber> withSequence(ExtendedSequenceNumber extendedSequenceNumber) {
if (extendedSequenceNumber == null) {
return nullValue(ExtendedSequenceNumber.class);
}
return new ExtendedSequenceNumberMatcher(extendedSequenceNumber);
}
public static class ExtendedSequenceNumberMatcher extends TypeSafeDiagnosingMatcher<ExtendedSequenceNumber> {
private final Matcher<String> sequenceNumberMatcher;
private final Matcher<Long> subSequenceNumberMatcher;
public ExtendedSequenceNumberMatcher(ExtendedSequenceNumber extendedSequenceNumber) {
sequenceNumberMatcher = equalTo(extendedSequenceNumber.getSequenceNumber());
subSequenceNumberMatcher = equalTo(extendedSequenceNumber.getSubSequenceNumber());
}
@Override
protected boolean matchesSafely(ExtendedSequenceNumber item, Description mismatchDescription) {
boolean matches = true;
if (!sequenceNumberMatcher.matches(item.getSequenceNumber())) {
matches = false;
mismatchDescription.appendDescriptionOf(sequenceNumberMatcher);
}
if (!subSequenceNumberMatcher.matches(item.getSubSequenceNumber())) {
matches = false;
mismatchDescription.appendDescriptionOf(subSequenceNumberMatcher);
}
return matches;
}
@Override
public void describeTo(Description description) {
description.appendText("An ExtendedSequenceNumber matching: { sequenceNumber: ")
.appendDescriptionOf(sequenceNumberMatcher).appendText(", subSequenceNumber: ")
.appendDescriptionOf(subSequenceNumberMatcher);
}
}
}

View file

@ -14,6 +14,16 @@
*/
package com.amazonaws.services.kinesis.multilang;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -21,11 +31,6 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -37,21 +42,15 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.SettableFuture;
public class MultiLangProtocolTest {
@ -87,7 +86,9 @@ public class MultiLangProtocolTest {
@Test
public void initializeTest() throws InterruptedException, ExecutionException {
when(messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId))).thenReturn(buildFuture(true));
when(messageWriter
.writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId)))))
.thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("initialize"), Message.class));
assertThat(protocol.initialize(), equalTo(true));
}

View file

@ -14,6 +14,15 @@
*/
package com.amazonaws.services.kinesis.multilang;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -24,10 +33,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -43,22 +48,19 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StreamingRecordProcessorTest {
@ -197,7 +199,8 @@ public class StreamingRecordProcessorTest {
phases(answer);
verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId));
verify(messageWriter)
.writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId))));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter).writeShutdownMessage(ShutdownReason.ZOMBIE);
}
@ -228,7 +231,8 @@ public class StreamingRecordProcessorTest {
phases(answer);
verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId));
verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput()
.withShardId(shardId))));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter, never()).writeShutdownMessage(ShutdownReason.ZOMBIE);
Assert.assertEquals(1, systemExitCount);