From 4b7f8aa3be0e11a4769b49629c86ffe1900bf4d5 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Tue, 27 Sep 2016 09:01:30 -0700 Subject: [PATCH] Some more work on the Python stuff --- .../multilang/MultiLangDaemonConfig.java | 37 ++++++-- .../services/kinesis/multilang/Matchers.java | 88 ++++++++++++++++++- .../multilang/MultiLangProtocolTest.java | 31 +++---- .../StreamingRecordProcessorTest.java | 32 ++++--- 4 files changed, 153 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java index 82eb5f77..f191eedc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java @@ -16,17 +16,22 @@ package com.amazonaws.services.kinesis.multilang; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This class captures the configuration needed to run the MultiLangDaemon. @@ -133,10 +138,29 @@ public class MultiLangDaemonConfig { private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException { Properties properties = new Properties(); - try (InputStream propertiesStream = new FileInputStream(new File(propertiesFileName))) { - properties.load(propertiesStream); + InputStream propertyStream = null; + try { + propertyStream = classLoader.getResourceAsStream(propertiesFileName); + if (propertyStream == null) { + File propertyFile = new File(propertiesFileName); + if (propertyFile.exists()) { + propertyStream = new FileInputStream(propertyFile); + } + } + + if (propertyStream == null) { + throw new FileNotFoundException( + "Unable to find property file in classpath, or file system: '" + propertiesFileName + "'"); + } + + properties.load(propertyStream); return properties; + } finally { + if (propertyStream != null) { + propertyStream.close(); + } } + } private static boolean validateProperties(Properties properties) { @@ -149,13 +173,16 @@ public class MultiLangDaemonConfig { private static ExecutorService buildExecutorService(Properties properties) { int maxActiveThreads = getMaxActiveThreads(properties); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder().setNameFormat("multi-lang-daemon-%04d"); LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads)); if (maxActiveThreads <= 0) { LOG.info("Using a cached thread pool."); - return Executors.newCachedThreadPool(); + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), + builder.build()); } else { LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads)); - return Executors.newFixedThreadPool(maxActiveThreads); + return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), builder.build()); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java index 3fe7b78a..b84d61a0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java @@ -1,7 +1,93 @@ package com.amazonaws.services.kinesis.multilang; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; + public class Matchers { - public static class InitializationInputMatcher + public static Matcher withInit(InitializationInput initializationInput) { + return new InitializationInputMatcher(initializationInput); + } + + public static class InitializationInputMatcher extends TypeSafeDiagnosingMatcher { + + private final Matcher shardIdMatcher; + private final Matcher sequenceNumberMatcher; + + public InitializationInputMatcher(InitializationInput input) { + shardIdMatcher = equalTo(input.getShardId()); + sequenceNumberMatcher = withSequence(input.getExtendedSequenceNumber()); + } + + @Override + protected boolean matchesSafely(final InitializationInput item, Description mismatchDescription) { + + boolean matches = true; + if (!shardIdMatcher.matches(item.getShardId())) { + matches = false; + shardIdMatcher.describeMismatch(item.getShardId(), mismatchDescription); + } + if (!sequenceNumberMatcher.matches(item.getExtendedSequenceNumber())) { + matches = false; + sequenceNumberMatcher.describeMismatch(item, mismatchDescription); + } + + return matches; + } + + @Override + public void describeTo(Description description) { + description.appendText("An InitializationInput matching: { shardId: ").appendDescriptionOf(shardIdMatcher) + .appendText(", sequenceNumber: ").appendDescriptionOf(sequenceNumberMatcher).appendText(" }"); + } + } + + public static Matcher withSequence(ExtendedSequenceNumber extendedSequenceNumber) { + if (extendedSequenceNumber == null) { + return nullValue(ExtendedSequenceNumber.class); + } + return new ExtendedSequenceNumberMatcher(extendedSequenceNumber); + } + + public static class ExtendedSequenceNumberMatcher extends TypeSafeDiagnosingMatcher { + + private final Matcher sequenceNumberMatcher; + private final Matcher subSequenceNumberMatcher; + + public ExtendedSequenceNumberMatcher(ExtendedSequenceNumber extendedSequenceNumber) { + sequenceNumberMatcher = equalTo(extendedSequenceNumber.getSequenceNumber()); + subSequenceNumberMatcher = equalTo(extendedSequenceNumber.getSubSequenceNumber()); + } + + @Override + protected boolean matchesSafely(ExtendedSequenceNumber item, Description mismatchDescription) { + + boolean matches = true; + if (!sequenceNumberMatcher.matches(item.getSequenceNumber())) { + matches = false; + mismatchDescription.appendDescriptionOf(sequenceNumberMatcher); + } + if (!subSequenceNumberMatcher.matches(item.getSubSequenceNumber())) { + matches = false; + mismatchDescription.appendDescriptionOf(subSequenceNumberMatcher); + } + + return matches; + } + + @Override + public void describeTo(Description description) { + description.appendText("An ExtendedSequenceNumber matching: { sequenceNumber: ") + .appendDescriptionOf(sequenceNumberMatcher).appendText(", subSequenceNumber: ") + .appendDescriptionOf(subSequenceNumberMatcher); + } + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index 8093f815..aa3b6480 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -14,6 +14,16 @@ */ package com.amazonaws.services.kinesis.multilang; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -21,11 +31,6 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.google.common.util.concurrent.SettableFuture; -import org.junit.Assert; - import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -37,21 +42,15 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import com.google.common.util.concurrent.SettableFuture; public class MultiLangProtocolTest { @@ -87,7 +86,9 @@ public class MultiLangProtocolTest { @Test public void initializeTest() throws InterruptedException, ExecutionException { - when(messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId))).thenReturn(buildFuture(true)); + when(messageWriter + .writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId))))) + .thenReturn(buildFuture(true)); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("initialize"), Message.class)); assertThat(protocol.initialize(), equalTo(true)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index c322f806..50c41ad0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -14,6 +14,15 @@ */ package com.amazonaws.services.kinesis.multilang; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -24,10 +33,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.multilang.messages.Message; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,22 +48,19 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; +import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.fasterxml.jackson.databind.ObjectMapper; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - @RunWith(MockitoJUnitRunner.class) public class StreamingRecordProcessorTest { @@ -197,7 +199,8 @@ public class StreamingRecordProcessorTest { phases(answer); - verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId)); + verify(messageWriter) + .writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId)))); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); verify(messageWriter).writeShutdownMessage(ShutdownReason.ZOMBIE); } @@ -228,7 +231,8 @@ public class StreamingRecordProcessorTest { phases(answer); - verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId)); + verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput() + .withShardId(shardId)))); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); verify(messageWriter, never()).writeShutdownMessage(ShutdownReason.ZOMBIE); Assert.assertEquals(1, systemExitCount);